2016-09-27 6 views
1

Ich verwende sparkSql 1.6.2 (Java API) und I zurückkehren haben folgende Datenrahmen zu verarbeiten, die eine Liste des Wertes in 2 Spalten:Funken - Java UDF mehrere Spalten

ID AttributeName AttributeValue 
0 [an1,an2,an3] [av1,av2,av3] 
1 [bn1,bn2]  [bv1,bv2] 

die gewünschte Tabelle ist:

ID AttributeName AttributeValue 
0 an1   av1 
0 an2   av2 
0 an3   av3 
1 bn1   bv1 
1 bn2   bv2 

Ich denke, ich muss eine Kombination der explodierenden Funktion und einer benutzerdefinierten UDF-Funktion verwenden.

fand ich die folgenden Ressourcen:

und ich kann ein Beispiel, das lesen Sie die zwei Spalten und gibt die Verkettung der ersten beiden Strings in eine erfolgreich ausgeführt Spalte

UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() { 
     public String call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
      return col1.apply(0) + col2.apply(0); 
     } 
    }; 

context.udf().register("combineUDF", combineUDF, DataTypes.StringType); 

t Das Problem besteht darin, die Signatur einer UDF zu schreiben, die zwei Spalten (in Java) zurückgibt. Soweit ich verstehe, muss ich einen neuen StructType wie die Definition weiter unten und festgelegt, dass als Rückgabetyp, aber bisher habe ich nicht geschafft den endgültigen Code

Arbeits
StructType retSchema = new StructType(new StructField[]{ 
      new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()), 
      new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()), 
     } 
    ); 

context.udf zu haben() .register ("combineUDF", combineUDF, retSchema);

Jede Hilfe wird wirklich geschätzt.

UPDATE: Ich versuche zunächst die zip zu implementieren (Attribut, Attribute) so ist, dann muss ich nur die Standard explodieren Funktion in sparkSql anwenden:

ID AttName_AttValue 
0 [[an1,av1],[an1,av2],[an3,av3]] 
1 [[bn1,bv1],[bn2,bv2]] 

Ich baute die folgende UDF:

UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() { 
     public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
      List<List<String>> zipped = new LinkedList<>(); 

      for (int i = 0, listSize = col1.size(); i < listSize; i++) { 
       List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i)); 
       zipped.add(subRow); 
      } 

      return zipped; 
     } 

    }; 

Aber wenn ich führen Sie den Code

myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10); 

Ich habe die folgende Fehlermeldung:

scala.MatchError: [[an1, AV1], [an1, AV2], [AN3, av3]] (der Klasse java.util.LinkedList)

und es sieht so aus, als ob das Kombinieren korrekt ausgeführt wurde, aber dann ist der Rückgabetyp nicht der erwartete in Scala.

Hilfe?

Antwort

0

Endlich gelang es mir, das Ergebnis zu bekommen, das ich suchte, aber wahrscheinlich nicht auf die effizienteste Art und Weise.Grundsätzlich

das sind 2 Schritt:

  • Zip der beiden Liste
  • in Zeilen der Liste Explodieren

Für den ersten Schritt I definiert die folgende UDF Funktion

UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() { 
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
     ArrayList zipped = new ArrayList(); 

     for (int i = 0, listSize = col1.size(); i < listSize; i++) { 
      String subRow = col1.apply(i) + ";" + col2.apply(i); 
      zipped.add(subRow); 
     } 

     return scala.collection.JavaConversions.asScalaBuffer(zipped); 
    } 

}; 

und dann habe ich es mit dem folgenden Code aufgerufen:

DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue")); 

In diesem Stadium sieht die df2 wie folgt aus:

DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row")); 

In diesem Stadium der df3 wie folgt aussieht:

ID AttName_AttValue 
0 [[an1,av1],[an1,av2],[an3,av3]] 
1 [[bn1,bv1],[bn2,bv2]] 

Dann habe ich die folgende Lambda-Funktion für explodier die Liste in Reihen genannt das:

ID AttName_AttValue 
0 [an1,av1] 
0 [an1,av2] 
0 [an3,av3] 
1 [bn1,bv1] 
1 [bn2,bv2] 

Endlich um die attrib zu teilen ute Namen und der Wert in zwei verschiedene Spalten, umgewandelt ich den Datenrahmen in eine JavaRDD, um die Kartenfunktion zu verwenden:

JavaRDD df3RDD = df3.toJavaRDD().map(
      (Function<Row, Row>) myRow -> { 
       String[] info = String.valueOf(myRow.get(1)).split(","); 
       return RowFactory.create(myRow.get(0), info[0], info[1]); 
     }).cache(); 

Wenn jemand eine bessere Lösung zu kommentieren fühlen sich frei. Ich hoffe es hilft.