Ich verwende sparkSql 1.6.2 (Java API) und I zurückkehren haben folgende Datenrahmen zu verarbeiten, die eine Liste des Wertes in 2 Spalten:Funken - Java UDF mehrere Spalten
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
die gewünschte Tabelle ist:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
Ich denke, ich muss eine Kombination der explodierenden Funktion und einer benutzerdefinierten UDF-Funktion verwenden.
fand ich die folgenden Ressourcen:
- Explode (transpose?) multiple columns in Spark SQL table
- How do I call a UDF on a Spark DataFrame using JAVA?
und ich kann ein Beispiel, das lesen Sie die zwei Spalten und gibt die Verkettung der ersten beiden Strings in eine erfolgreich ausgeführt Spalte
UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
return col1.apply(0) + col2.apply(0);
}
};
context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
t Das Problem besteht darin, die Signatur einer UDF zu schreiben, die zwei Spalten (in Java) zurückgibt. Soweit ich verstehe, muss ich einen neuen StructType wie die Definition weiter unten und festgelegt, dass als Rückgabetyp, aber bisher habe ich nicht geschafft den endgültigen Code
ArbeitsStructType retSchema = new StructType(new StructField[]{
new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
}
);
context.udf zu haben() .register ("combineUDF", combineUDF, retSchema);
Jede Hilfe wird wirklich geschätzt.
UPDATE: Ich versuche zunächst die zip zu implementieren (Attribut, Attribute) so ist, dann muss ich nur die Standard explodieren Funktion in sparkSql anwenden:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
Ich baute die folgende UDF:
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
List<List<String>> zipped = new LinkedList<>();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
zipped.add(subRow);
}
return zipped;
}
};
Aber wenn ich führen Sie den Code
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
Ich habe die folgende Fehlermeldung:
scala.MatchError: [[an1, AV1], [an1, AV2], [AN3, av3]] (der Klasse java.util.LinkedList)
und es sieht so aus, als ob das Kombinieren korrekt ausgeführt wurde, aber dann ist der Rückgabetyp nicht der erwartete in Scala.
Hilfe?