2016-01-20 16 views
13

Ich versuche, meine Eingabedaten zu übernehmen:Funke: Spalte Datenrahmen hinzufügen bedingt

A B  C 
-------------- 
4 blah 2 
2   3 
56 foo  3 

Und eine Spalte am Ende hinzufügen, basierend darauf, ob B leer ist oder nicht:

A B  C  D 
-------------------- 
4 blah 2  1 
2   3  0 
56 foo  3  1 

I kann dies leicht tun, indem Sie den Eingabedatenrahmen als temporäre Tabelle registrieren und dann eine SQL-Abfrage eingeben.

Aber ich würde gerne wissen, wie man das nur mit Scala-Methoden macht und keine SQL-Abfrage in Scala eingeben muss.

Ich habe versucht .withColumn, aber ich kann das nicht tun, was ich will.

Antwort

47

withColumn Versuchen mit der Funktion when wie folgt:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ // for `toDF` and $"" 
import org.apache.spark.sql.functions._ // for `when` 

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5))) 
    .toDF("A", "B", "C") 

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1)) 

newDf.show() zeigt

+---+----+---+---+ 
| A| B| C| D| 
+---+----+---+---+ 
| 4|blah| 2| 1| 
| 2| | 3| 0| 
| 56| foo| 3| 1| 
|100|null| 5| 0| 
+---+----+---+---+ 

I die (100, null, 5) Reihe zum Testen der isNull Fall hinzugefügt.

habe ich versucht, diesen Code mit Spark 1.6.0 aber wie im Code von when kommentiert, es funktioniert auf die Versionen nach 1.4.0.

+0

Dies ist genau das, was ich gesucht habe. Ich habe ein paar verschiedene Dinge mit "wann" und "anders" ausprobiert, aber ich denke, ich habe das genaue Format falsch verstanden. Etwas abseits von Thema, aber wissen Sie, wie Spark mit Column behandelt? Wie wäre es, wenn ich ~ 20 Spalten hinzufüge, wäre es schneller, 20 .withColumn zu machen und einen Datenrahmen zu behalten oder ihn einer RDD zuzuordnen und sie einfach alle in die Map einzufügen, dann zurück in einen Datenrahmen zu konvertieren, um auf Parkett zu speichern ? – mcmcmc

+1

Nur gefunden [http://stackoverflow.com/questions/33826495/spark-scala-2-10-tuple-limit]. Ich denke, UDFs sind, was ich suche. – mcmcmc

+0

UDFs ist, worüber ich unten auch gesprochen habe ... –

0

Wie wäre es mit so etwas?

val newDF = df.filter($"B" === "").take(1) match { 
    case Array() => df 
    case _ => df.withColumn("D", $"B" === "") 
} 

take(1) mit einem minimalen Hit

2

Meine schlechten haben sollte, hatte ich einen Teil der Frage verpaßt.

Der beste, sauberste Weg ist die Verwendung eines . Erklärung innerhalb des Codes.

// create some example data...BY DataFrame 
// note, third record has an empty string 
case class Stuff(a:String,b:Int) 
val d= sc.parallelize(Seq(("a",1),("b",2), 
    ("",3) ,("d",4)).map { x => Stuff(x._1,x._2) }).toDF 

// now the good stuff. 
import org.apache.spark.sql.functions.udf 
// function that returns 0 is string empty 
val func = udf((s:String) => if(s.isEmpty) 0 else 1) 
// create new dataframe with added column named "notempty" 
val r = d.select($"a", $"b", func($"a").as("notempty")) 

    scala> r.show 
+---+---+--------+ 
| a| b|notempty| 
+---+---+--------+ 
| a| 1| 1111| 
| b| 2| 1111| 
| | 3|  0| 
| d| 4| 1111| 
+---+---+--------+ 
+0

Hier ist nur ein Datenfeld im Spiel. Vielleicht möchten Sie die Frage erneut lesen –

Verwandte Themen