2016-10-25 20 views
1

Ich bin Verarbeitung einen Funken DataFrame (DF) und muß Spark eine Spalte, um es auf dem Fluss an, von innen ein Aufruf mapPartitions:Bedingtes Spalte und ein Mehrwert Zeilen

// Don't worry about what 'widget' is or represents 
val rdd = df.mapPartitions { rows => addColIfNecessary(rows, widget) } 

Dann:

Dies ist offensichtlich nur Pseudo-Code, sondern vermittelt, was ich versuche zu tun. Irgendwelche Ideen, wie ich das umsetzen kann?

+1

möglich Duplikat http://stackoverflow.com/questions/33876155/how-to-add-columns-into-org- apache-spark-sql-row-inside-of-mappartitions – Shankar

+0

Dank @Shankar aber in dieser Frage wird die Spalte zu einem neu erstellten DF hinzugefügt, nicht ein vorhandenes aus der Kartenpartitionen Funktion. Also würde ich argumentieren, dass dies entweder eine andere (eigenständige) Frage ist, oder dass ich zumindest verstehen muss, was ich bei meinem Ansatz falsch mache, dass dann diese Frage auch die Lösung für mein Problem sein könnte. Danke noch einmal! – smeeb

+1

Angesichts der Tatsache, dass DF ein Spaltenformat ist, wäre es ratsamer, einen Wert zu einer nillierbaren Spalte bedingt hinzuzufügen, als eine Spalte zu einigen Zeilen hinzuzufügen. Gibt es auch eine spezielle Notwendigkeit, dies innerhalb von 'mapPartitions' zu tun? – maasg

Antwort

1

DataFrames sind spaltenorientierte Strukturen, was bedeutet, dass das Hinzufügen einer Spalte zu einigen Zeilen keine gute Idee ist. Stattdessen können Sie die Unterstützung für Nullable-Werte in DataFrames nutzen und statt einer zusätzlichen Spalte einen optionalen Wert für eine Zeile basierend auf einigen Kriterien hinzufügen.

Ein Beispiel: Lasst uns einen DF von Benutzern und Seiten übernehmen:

val users = Seq("Alice" , "Bob", "Charly", "Dean", "Eve", "Flor", "Greta") 
val pages = (1 to 9).map(i => s"page_$i") 
val userPages = for {u <- users 
        p <- pages} yield (u,p) 

val userPagesDF = sparkContext.parallelize(userPages).toDF("user","page") 

// a user defined function that takes the last digit from the page and uses it to calculate a "rank". It only ranks pages with a number higher than 7 

val rankUDF = udf((p:String) => if (p.takeRight(1).toInt>7) "top" else null) 

// New DF with the extra column "rank", which contains values for only some rows 
val ranked = userPagesDF.withColumn("rank", topPage($"page")) 

ranked.show 

+-----+-------+----+ 
| user| page|rank| 
+-----+-------+----+ 
|Alice| page_1|null| 
|Alice| page_2|null| 
|Alice| page_3|null| 
|Alice| page_4|null| 
|Alice| page_5|null| 
|Alice| page_6|null| 
|Alice| page_7|null| 
|Alice| page_8| top| 
|Alice| page_9| top| 
| Bob| page_1|null| 
| Bob| page_2|null| 
| Bob| page_3|null| 
| Bob| page_4|null| 
| Bob| page_5|null| 
| Bob| page_6|null| 
| Bob| page_7|null| 
| Bob| page_8| top| 
| Bob| page_9| top| 
+-----+-------+----+ 

ranked.printSchema 

root 
|-- user: string (nullable = true) 
|-- page: string (nullable = true) 
|-- rank: string (nullable = true)