2017-07-27 2 views
0

Spark 2.1.x hier. Ich habe eine Reihe von JSON-Dateien (mit identischem Schema), die ich Dataset wie so in einen einzigen Funken lesen:Hinzufügen mehrerer Spalten zu Spark Dataset beim Iterieren seiner Datensätze

val ds = spark.read.json("some/path/to/lots/of/json/*.json") 

ich dann das ds Schema drucken kann, und sehe, dass alles korrekt gelesen wurde:

ds.printSchema() 

// Outputs: 
root 
|-- fizz: boolean (nullable = true) 
|-- moniker: string (nullable = true) 
|-- buzz: string (nullable = true) 
|-- foo: string (nullable = true) 
|-- bar: string (nullable = true) 

Bitte beachten Sie die moniker String-Spalte. Ich möchte jetzt:

  1. Fügen Sie diesem Dataset und/oder seinem Schema drei neue Spalten hinzu; (a) eine Datums-/Zeitspalte mit der Bezeichnung special_date, (b) eine UUID-Spalte mit der Bezeichnung special_uuid und (c) eine Zeichenfolgenspalte mit der Bezeichnung special_phrase; dann muß
  2. I über alle Datensätze in ds, und für jeden Datensatz iterieren, dessen moniker Wert in drei Folgefunktionen übergeben: (a) deriveSpecialDate(val moniker : String) : Date, (b) deriveSpecialUuid(val moniker : String) : UUID und (c) deriveSpecialPhrase(val moniker : String) : String. Die Ausgabe jeder dieser Funktionen muss dann der Wert dieses Datensatzes für die jeweilige Spalte werden.

Mein bester Versuch:

val ds = spark.read.json("some/path/to/lots/of/json/*.json") 

ds.foreach(record => { 
    val moniker : String = record.select("moniker") 
    val specialDate : Date = deriveSpecialDate(moniker) 
    val specialUuid : UUID = deriveSpecialUuid(moniker) 
    val specialPhrase : String = deriveSpecialPhrase(moniker) 

    // This doesn't work because special_* fields don't exist in the original 
    // schema dervied from the JSON files. We're ADDING these columns after the 
    // JSON read and then populating their values dynamically. 
    record.special_date = specialDate 
    record.special_uuid = specialUuid 
    record.special_phrase = specialPhrase 
}) 

Jede Idee, wie dies erreicht werden kann?

+1

Also im Grunde möchten Sie drei Spalte hinzufügen, indem Sie jede Funktion aufrufen? \ –

+0

Hallo @ShankarKoirala (+1) - ja genau! – smeeb

Antwort

1

würde ich die ursprünglichen Daten-Set mit 3 Spalten mit UDF (User Defined Functions) von Funken

val deriveSpecialDate = udf((moniker: String) => // implement here) 
val deriveSpecialUuid= udf((moniker: String) => // implement here) 
val deriveSpecialPhrase = udf((moniker: String) => // implement here) 

Danach erhöhen Sie so etwas tun kann:

ds.withColumn("special_date", deriveSpecialDate(col("moniker))) 
.withColumn("special_uuid", deriveSpecialUuid(col("moniker))) 
.withColumn("special_phrase", deriveSpecialPhrase (col("moniker))) 

Es wird Sie bringen ein neuer Datenrahmen mit den drei Spalten. Wenn Sie möchten, können Sie auch mithilfe der Kartenfunktion

+0

Dank @dumitru (+1) - behält diese Lösung ('ds.withColumn ...') auch die ursprünglichen Spalten des Datasets? Es gibt 5 Spalten im ursprünglichen Datensatz, und ich möchte 3 weitere hinzufügen (für insgesamt 8). Oder ändern Sie * das Schema, um nur drei Spalten zu haben (die ursprünglichen 5 fallen lassen)? – smeeb

+0

Es behält auch die Spalten, Sie müssen explizit Drop-to-Drop spezifische Spalten aufrufen – dumitru

0

in ein Dataset konvertieren. Um eine neue Spalte zu erstellen, können Sie mitColumn verwenden. Und wenn Sie bereits eine Funktion haben, müssen Sie diese Funktion als UDF (User Defined Function) registrieren

val sd = sqlContext.udf.register("deriveSpecialDate",deriveSpecialDate _) 
val su = sqlContext.udf.register("deriveSpecialUuid",deriveSpecialUuid _) 
val sp = sqlContext.udf.register("deriveSpecialPhrase", deriveSpecialPhrase _) 

dieses UDF Sie withcolumn denen verwenden müssen erstellt eine neue Spalte als

ds.withColumn("special_date", sd($"moniker)) 
.withColumn("special_uuid", su($"moniker)) 
.withColumn("special_phrase", sp($"moniker)) 

Mit Dadurch erhalten Sie Ihren ursprünglichen Datensatz mit drei neuen Spalten.

Verwandte Themen