2016-03-22 14 views
7

Ich möchte ein JSON aus einem Spark v.1.6 (mit Scala) Dataframe erstellen. Ich weiß, dass es die einfache Lösung gibt, df.toJSON zu tun.Spark Row zu JSON

Allerdings sieht mein Problem ein bisschen anders aus. Betrachten wir zum Beispiel einen Datenrahmen mit den folgenden Spalten:

| A |  B  | C1 |  C2 | C3 | 
------------------------------------------- 
| 1 | test  | ab | 22 | TRUE | 
| 2 | mytest | gh | 17 | FALSE | 

ich am Ende mit einem Datenrahmen haben möchte

| A |  B  |      C     | 
---------------------------------------------------------------- 
| 1 | test  | { "c1" : "ab", "c2" : 22, "c3" : TRUE } | 
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } | 

wobei C eine JSON enthält C1, C2, C3. Leider weiß ich zur Kompilierzeit nicht wie der Datenframe aussieht (außer die Spalten A und B die immer "fixed" sind).

Aus dem Grund, warum ich das brauche: Ich verwende Protobuf zum Senden der Ergebnisse. Leider hat mein Datenframe manchmal mehr Spalten als erwartet und ich würde diese immer noch über Protobuf senden, aber ich möchte nicht alle Spalten in der Definition angeben.

Wie kann ich das erreichen?

+0

noch ein Datenrahmen – navige

+0

Nein, sorry, ich meine eher, wie man hinzufügen 'C1, C2, C3' als JSON-String-Spalte an den vorhandenen Datenrahmen. Ich habe den Beitrag aktualisiert, um für die Version von Spark und Scala als Sprache zu klären. – navige

+0

Entschuldigung! Klar, ich habe gerade die Frage aktualisiert (zusammen mit einem Grund, warum ich das erreichen möchte) und ein Beispiel hinzugefügt. – navige

Antwort

7

Spark-2.1 ​​native Unterstützung für diesen Anwendungsfall (siehe #15354) haben sollte.

import org.apache.spark.sql.functions.to_json 
df.select(to_json(struct($"c1", $"c2", $"c3"))) 
4

Zuerst lässt C konvertieren ist ein struct:

val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C")) 

Dies kann Struktur JSONL umgewandelt werden unter Verwendung von toJSON wie zuvor:

dfStruct.toJSON.collect 
// Array[String] = Array(
// {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
// {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}}) 

Ich bin nicht bekannt, dass integrierte Methode, dass kann eine einzelne Spalte konvertieren, aber Sie können es entweder einzeln konvertieren und join oder verwenden Sie Ihre Lieblings-JSON-Parser in einer UDF.

case class C(C1: String, C2: Int, C3: Boolean) 

object CJsonizer { 
    import org.json4s._ 
    import org.json4s.JsonDSL._ 
    import org.json4s.jackson.Serialization 
    import org.json4s.jackson.Serialization.write 

    implicit val formats = Serialization.formats(org.json4s.NoTypeHints) 

    def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3)) 
} 


val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
    CJsonizer.toJSON(c1, c2, c3)) 

df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3")) 
+0

Eigentlich geht es bei meiner Frage eigentlich um den zweiten Teil, wie man die einzelnen Spalten in JSON umwandelt. Sie erwähnen 'Join' die Spalten, aber das funktioniert nicht wirklich, da ich einerseits einen 'RDD [String]' und andererseits einen 'DataFrame' habe. – navige

+1

Wie er sagt, benutze einfach eine' UDF '. Sie müssen nicht einmal einen vollwertigen JSON-Parser in der 'UDF' verwenden - Sie können einfach eine JSON-Zeichenfolge mit' map' und 'mkString' spontan erstellen. Wahrscheinlich müssen Sie 'DataFrame.columns' oder möglicherweise' DataFrame.dtypes' verwenden, um sowohl die 'select'-Anweisung als auch die' map' in 'UDF' zu erzeugen. –

+0

Ich stimme @DavidGriffin - udf kann hier die einfachste Lösung sein. Und Jackson und Json4s sind bereits mit anderen Abhängigkeiten gezogen. – zero323

3

hier kein JSON-Parser, und sie paßt sich an Ihr Schema:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit} 

df.select(
    col(df.columns(0)), 
    col(df.columns(1)), 
    concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => { 
     val c = dt._1; 
     val t = dt._2; 
     concat(
     lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ), 
     col(c), 
     lit(if(t=="StringType") "\""; else "") 
    ) 
    }):_*), 
    lit("}") 
) as "C" 
).collect() 
+0

sieht ein bisschen hacky aus, aber es funktioniert :-) – navige

+1

Yup und yup. JSON ist im Allgemeinen aber hacky, wenn Sie mich fragen. –