2016-04-11 18 views
1

Ich versuche, einige Felder in meinem Datenrahmen in JSON zu schreiben. Meine Datenstruktur im Datenrahmen istWie schreibe ich eine Reihe von Feldern in JSON?

Key|col1|col2|col3|col4 
key|a |b |c |d 
Key|a1 |b1 |c1 |d1 

Jetzt ich nur die col1 zu col4 Felder zu JSON konvertieren Ich versuche und einen Namen in das Feld Json geben

erwartete Ausgabe

[Key,{cols:[{col1:a,col2:b,col3:c,col4:d},{col1:a1,col2:b1,col3:c1,col4:d1}] 

Ich habe dafür ein udf geschrieben.

val summary = udf( 
(col1:String, col2:String, col3:String, col4:String) => "{\"cols\":[" + " {\"col1\":" + col1 + ",\"col2\":" + col2 + ",\"col3\":" + col3 + ",\"col4\":" + col4 + "}]}" 
) 

val result = input.withColumn("Summary",summary('col1,'col2,'col3,'col4)) 
val result1 = result.select('Key,'Summary) 
result1.show(10) 

Das ist mein Ergebnis

[Key,{cols:[{col1:a,col2:b,col3:c,col4:d}]}] 
[Key,{cols:[{col1:a1,col2:b1,col3:c1,col4:d1}]}] 

Wie Sie sehen können, sind sie nicht gruppiert. Gibt es eine Möglichkeit, diese Zeilen mit der UDF selbst zu gruppieren? Ich bin neu bei scala/Spark und nicht in der Lage, das richtige udf herauszufinden.

+0

Ich glaube nicht, dass Sie Ihre ‚erwartete Ausgabe‘ korrekt beendet haben; Ich würde erwarten, dass es am Ende ein anderes "}]" geben wird, das der Eröffnung "[{" entspricht. –

Antwort

1

UDFs ordnen eine Zeile einer Zeile zu. Wenn Sie in Ihrem DataFrame mehrere Zeilen haben, die Sie zu einem Element kombinieren möchten, müssen Sie eine Funktion wie reduceByKey verwenden, die mehrere Zeilen aggregiert.

Es kann eine DataFrame spezifische Funktion, dies zu tun, aber ich würde mit der RDD Funktionalität dieser Verarbeitung tun, etwa so:

val colSummary = udf( 
(col1:String, col2:String, col3:String, col4:String) => "{\"col1\":" + col1 + ",\"col2\":" + col2 + ",\"col3\":" + col3 + ",\"col4\":" + col4 + "}" 
) 
val colRDD = input.withColumn("Summary",summary('col1,'col2,'col3,'col4)).rdd.map(x => (x.getString(0),x.getString(5))) 

Dies gibt uns eine RDD[(String,String)], die uns die PairRDDFunctions verwenden können wie reduceByKey (siehe docs). Der Schlüssel des Tupels ist der ursprüngliche Schlüssel, und der Wert ist die JSON-Kodierung für ein einzelnes Element, das wir aggregieren müssen, um die cols Liste zu bilden. Wir kleben sie alle zusammen in eine Komma-getrennte Liste, und dann fügen wir den Anfang und das Ende hinzu, und dann sind wir fertig.

val result = colRDD.reduceByKey((x,y) => (x+","+y)).map(x => "["+x._1+",{\"cols\":["+x._2+"]}]") 
result.take(10) 
+1

Danke. Es hat funktioniert. Nur wenige Änderungen (in diesem Kommentar erklärt) in der Antwort. val colRDD fehlt am Ende ein ')'. und auch das Ergebnis fehlt am Ende ein ')'. In colRDD ist es x.getString (5) anstelle von x.getString (1), da Summary das fünfte Feld ist, nachdem wir dem Eingabedatenrahmen ein Feld hinzugefügt haben. Und schließlich result.take (10) als 'result' ist eine rdd. – dheee

2
// Create your dataset 
scala> val ds = Seq((1, "hello", 1L), (2, "world", 2L)).toDF("id", "token", "long") 
ds: org.apache.spark.sql.DataFrame = [id: int, token: string ... 1 more field] 

// select the fields you want to map to json 
scala> ds.select('token, 'long).write.json("your-json") 

// check the result 
➜ spark git:(master) ✗ ls -ltr your-json/ 
total 16 
-rw-r--r-- 1 jacek staff 27 11 kwi 17:18 part-r-00007-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00006-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00005-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00004-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 27 11 kwi 17:18 part-r-00003-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00002-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00001-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00000-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 _SUCCESS 
➜ spark git:(master) ✗ cat your-json/part-r-00003-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
{"token":"hello","long":1} 
➜ spark git:(master) ✗ cat your-json/part-r-00007-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
{"token":"world","long":2} 
Verwandte Themen