2015-05-21 10 views
6

Ich benutze die Spark Scala API. Ich habe einen Spark-SQL Datenrahmen (lesen aus einer Avro-Datei) mit folgendem Schema:Wie benutze ich Spark SQL DataFrame mit flatMap?

root 
|-- ids: array (nullable = true) 
| |-- element: map (containsNull = true) 
| | |-- key: integer 
| | |-- value: string (valueContainsNull = true) 
|-- match: array (nullable = true) 
| |-- element: integer (containsNull = true) 

Wesentlichen zwei Säulen [ids: Liste [Map [Int, String]], match: Liste [Int]]. Beispieldaten, die wie folgt aussieht: flatMap() ist

[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)] 
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)] 
... 

Was ich tun möchte jede Zeile 3 Spalten erzeugen [id, Eigenschaft, Spiel]. Unter Verwendung der obigen zwei Zeilen wie die Eingabedaten würden wir bekommen:

[1,a,0] 
[2,b,0] 
[3,c,1] 
[4,d,0] 
[5,c,1] 
[6,a,0] 
[7,e,1] 
[8,d,0] 
... 

und dann groupBy die StringEigenschaft (ex: a, b, ...) erzeugen count("property") und sum("match"):

a 2 0 
b 1 0 
c 2 2 
d 2 0 
e 1 1 

ich würde so etwas tun wollen:

val result = myDataFrame.select("ids","match").flatMap( 
    (row: Row) => row.getList[Map[Int,String]](1).toArray()) 
result.groupBy("property").agg(Map(
    "property" -> "count", 
    "match" -> "sum")) 

das Problem ist, dass die flatMap konvertiert DataFrame in RDD. Gibt es eine gute Möglichkeit, eine Operation vom Typ flatMap gefolgt von groupBy mithilfe von DataFrames auszuführen?

Antwort

8

Was macht flatMap das, was Sie wollen? Es konvertiert jede Eingabezeile in 0 oder mehr Zeilen. Es kann sie herausfiltern oder neue hinzufügen. In SQL, um dieselbe Funktionalität zu erhalten, verwenden Sie join. Können Sie tun, was Sie mit einer join machen wollen?

Alternativ können Sie auch auf Dataframe.explode aussehen, die nur eine bestimmte Art von join ist (man kann leicht Handwerk Ihr eigenes explode durch einen Datenrahmen zu einem UDF Beitritt). explode nimmt eine einzelne Spalte als Eingabe und lässt Sie es teilen oder in mehrere Werte und dann join die ursprüngliche Zeile wieder in die neuen Zeilen konvertieren. Also:

user  groups 
griffin mkt,it,admin 

Könnte werden:

user  group 
griffin mkt 
griffin it 
griffin admin 

So würde ich sagen, einen Blick auf DataFrame.explode nehmen und wenn das nicht Sie bekommt es leicht, versuchen, mit UDF verbindet.

+0

Vielen Dank für Ihre Antwort! Die DataFrame.explode-Methode ist genau das, wonach ich gesucht habe. –

0

Mein SQL ist ein bisschen rostig, aber eine Option ist in Ihrer FlatMap, um eine Liste von Row-Objekten zu erstellen und dann können Sie die resultierende RDD zurück in einen DataFrame konvertieren.

Verwandte Themen