2

Wir haben zwei Datensatz SetDataFrame, sentenceDataFrame2 erstellt, wo Suche ersetzt werden soll.Suchen und Ersetzen in Apache Spark

sampleDataFrame2 speichert die Begriffe zum Suchen und Ersetzen.

Wir führten auch alle 11 Arten von Join "inner", "außen", "voll", "fouter", "links", "links", "rechts", "rechts", "links", "links" ',' cross 'keiner von ihnen gab uns das Ergebnis.

Können Sie uns bitte wissen Wohin wir gehen Falsch und Bitte zeigen Sie uns in die richtige Richtung.

 List<Row> data = Arrays.asList(
      RowFactory.create(0, "Allen jeevi pramod Allen"), 
      RowFactory.create(1,"sandesh Armstrong jeevi"), 
      RowFactory.create(2,"harsha Nischay DeWALT")); 

     StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, 
      Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, 
      Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 


     List<Row> data2 = Arrays.asList(
      RowFactory.create("Allen", "Apex Tool Group"), 
      RowFactory.create("Armstrong","Apex Tool Group"), 
      RowFactory.create("DeWALT","StanleyBlack")); 

     StructType schema2 = new StructType(new StructField[] { 
     new StructField("label2", DataTypes.StringType, false, 
      Metadata.empty()), 
     new StructField("sentence2", DataTypes.StringType, false, 
      Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

     Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross"); 
     System.out.println("Left anti join count :"+remainingElements.count()); 

Eingang

Alle jeevi pramod All
sandesh Armstrong jeevi
harsha Nischay DeWALT

Erwartete Ausgabe

Apex Tool Group jeevi pramod Apex Tool Group
sandesh Apex Tool Group jeevi
harsha Nischay StanleyBlack

Antwort

3

Für Join-Bedingungen, die einfach Gleichheiten wie dies nicht bedeuten, Sie gehen zu müssen Spark-benutzerdefinierte Funktionen (UDF) verwenden.

Hier ist ein JUnit Code-Snippet, das nicht direkt kompiliert wird, aber die relevanten Importe und Logik zeigt. Die Java-API ist jedoch ziemlich ausführlich. Ich überlasse das Problem, dies in Scala als Übung für den Leser zu tun. Es wird viel prägnanter sein.

Der statische Import wird für die Methoden callUDF() und col() benötigt.

import static org.apache.spark.sql.functions.*; 

import org.apache.spark.sql.*; 
import org.apache.spark.sql.api.java.UDF2; 
import org.apache.spark.sql.api.java.UDF3; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.Metadata; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

@Test 
public void testSomething() { 
    List<Row> data = Arrays.asList(
     RowFactory.create(0, "Allen jeevi pramod Allen"), 
     RowFactory.create(1, "sandesh Armstrong jeevi"), 
     RowFactory.create(2, "harsha Nischay DeWALT") 
    ); 

    StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
    }); 
    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 

    List<Row> data2 = Arrays.asList(
     RowFactory.create("Allen", "Apex Tool Group"), 
     RowFactory.create("Armstrong","Apex Tool Group"), 
     RowFactory.create("DeWALT","StanleyBlack") 
    ); 

    StructType schema2 = new StructType(new StructField[] { 
     new StructField("label2", DataTypes.StringType, false, Metadata.empty()), 
     new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
    }); 
    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

    UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() { 
     private static final long serialVersionUID = -5239951370238629896L; 

     @Override 
     public Boolean call(String t1, String t2) throws Exception { 
      return t1.contains(t2); 
     } 
    }; 
    spark.udf().register("contains", contains, DataTypes.BooleanType); 

    UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() { 
     private static final long serialVersionUID = -2882956931420910207L; 

     @Override 
     public String call(String t1, String t2, String t3) throws Exception { 
      return t1.replaceAll(t2, t3); 
     } 
    }; 
    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType); 

    Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"))) 
              .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) 
              .select(col("sentence_replaced")); 

    joined.show(false); 
} 

Ausgang:

+--------------------------------------------+ 
|sentence_replaced       | 
+--------------------------------------------+ 
|Apex Tool Group jeevi pramod Apex Tool Group| 
|sandesh Apex Tool Group jeevi    | 
|harsha Nischay StanleyBlack     | 
+--------------------------------------------+ 
+0

Danke @Ivan Gozali es funktionierte perfekt. – Nischay

2

noch mit einem ähnlichen Problem konfrontiert

Eingang

All Armstrong jeevi pramod All
sandesh Armstrong jeevi
harsha nischay DeWALT

Ausgabe

Apex Tool Group Armstrong jeevi pramod Apex Tool Group
Alle Apex Tool Group jeevi pramod All
sandesh Apex Tool Group jeevi
harsha nischay StanleyBlack

Erwartete Ausgabe

Apex Tool Group Apex Tool Group jeevi pramod Apex Tool Group
sandesh Apex Tool Group jeevi
harsha nischay StanleyBlack

dieser Ausgang Erhielt, wenn es mehrere Ersetzungen in einer Reihe zu tun.

Gibt es eine andere Methode, die befolgt werden muss, um die richtige Ausgabe zu erhalten? Oder ist das eine Einschränkung von UDF?

5

Wir können replaceAll und UDF Funktionen verwenden, um die erwartete Ausgabe zu erreichen.

public class Test { 

    public static void main(String[] args) { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
     SQLContext sqlContext = new SQLContext(sc); 
     SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate(); 

     List<Row> data = Arrays.asList(
     RowFactory.create(0, "Allen jeevi pramod Allen"), 
     RowFactory.create(1, "sandesh Armstrong jeevi"), 
     RowFactory.create(2, "harsha Nischay DeWALT") 
    ); 

     StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, 
       Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, 
       Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 
     UDF1 mode = new UDF1<String, String>() { 
      public String call(final String types) throws Exception { 
       return types.replaceAll("Allen", "Apex Tool Group") 
       .replaceAll("Armstrong","Apex Tool Group") 
       .replaceAll(""DeWALT","StanleyBlack"") 
      } 
     }; 

     sqlContext.udf().register("mode", mode, DataTypes.StringType); 

     sentenceDataFrame.createOrReplaceTempView("people"); 
     Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence"); 
     newDF.show(false); 
} 
} 

Ausgang

+--------------------------------------------+------+ 
    |sentence         |label | 
    +--------------------------------------------+------+ 
    |Apex Tool Group jeevi pramod Apex Tool Group| 0 | 
    |sandesh Apex Tool Group jeevi    | 1 | 
    |harsha Nischay StanleyBlack     | 2 | 
    +--------------------------------------------+------+