2017-02-04 7 views
0

Was ist der beste Weg, um einem DataFrame eine neue Spalte und neue Zeilen hinzuzufügen?
Ist es möglich, dies gleichzeitig zu tun?Spark DataFrame Spalte mit Zeilen hinzufügen

Zum Beispiel, ich habe eine Tabelle AB wie:

+------+-------+ 
|  a|  b| 
+------+-------+ 
| true| true|  
| true| false| 
+---+---+------+ 

Nun würde Ich mag eine neue Spalte „c“ zu AB und neue Zeilen hinzufügen, aber nur dann, wenn eine Bedingung erfüllt ist. Diese Bedingung sollte für jede Zeile in AB gelten, einschließlich c = false und c = true.

Lassen foo(row): Boolean sein den Zustand und:

foo(Row(true, true, false)) = true 
foo(Row(true, true, true)) = true 
foo(Row(true, false, false)) = true 
foo(Row(true, false, false)) = false 

So ist die neue Tabelle ABC sollte wie folgt aussieht:

+------+-------+-------+ 
    |  a|  b|  c| 
    +------+-------+-------+ 
    | true| true| true|  
    | true| true| false|  
    | true| false| false| 
    +------+-------+-------+ 

Ich versuchte Crossjoin und Filter:

val rows = List(Row(true), Row(false)) 

val C = spark.createDataFrame(
    spark.sparkContext.parallelize(rows), 
    StructType(List(StructField("c", BooleanType))) 
) 

val ABC = AB.join(C).filter(r => foo(row)) 

Die Leistung ist sehr schlecht (können Sie mir sagen, warum?). Ich habe auch versucht mit flatMap:

 val encoder = RowEncoder(AB.schema.add(StructField("c", BooleanType))) 

     val ABC = AB.flatMap { row => 
     Seq(Row.fromSeq(row.toSeq :+ true), Row.fromSeq(row.toSeq :+ false)).filter(r => foo(r)) 
     }(encoder) 

Die Leistung ist auch schlecht. Das Gießen für große Tische dauert zu lange. Wie ich bemerkt habe, wird das Casting an der Masternode angewendet. Bei großen Tabellen (Millionen von Zeilen) wird schlecht ausgeführt.

Haben Sie andere und bessere Lösungen für dieses Problem?

Btw, ich benutze Apache Spark 2.0.1 mit Scala.

+0

Ich verstehe ehrlich nicht, wonach Sie fragen. Und für die Aufzeichnung haben Cross-Joins immer schlechte Performance, es sei denn, Sie verwenden Hashing-Techniken wie LSH. – eliasah

+0

Ich möchte eine boolesche Tabelle mit einer neuen Spalte und neuen Zeilen erweitern. Meine alte Tabelle kann 2^n Zeilen haben und die neue Tabelle 2^(n + 1) Zeilen (n = | Spalten |). Für große n gibt es zu viele Zeilen. Daher möchte ich einige Zeilen mit der Funktion "foo" herausfiltern. –

Antwort

1

Ich denke, die Sie gemacht haben es komplizierter, als es sein muss, von dem, was ich verstehe, ist folgendes das gewünschte Ergebnis nach

dann
val stuff = List[Row](Row(true, true),Row(true, false),Row(false, true), Row(false, false)) 
val rows = sc.parallelize(stuff) 
val schema = StructType(StructField("a", BooleanType, true) :: StructField("b", BooleanType, true) :: Nil) 
val frame = spark.createDataFrame(rows, schema).withColumn("c", col("a")&&(col("b"))) 

sind ergeben, wenn Sie einen frame.show tun sollte es zeigen

+-----+-----+-----+ 
| a| b| c| 
+-----+-----+-----+ 
| true| true| true| 
| true|false|false| 
|false| true|false| 
|false|false|false| 
+-----+-----+-----+ 
Verwandte Themen