2017-01-25 12 views
7

ich versuche, einen Datenrahmen neu zu partitionieren nach einem columnm die der Datenrahmen hat N (lassen N=3 sagen) unterschiedliche Werte in der Partition-Spalte x, zB:Dropping leere Datenrahmen Partitionen in Apache Spark

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data 

Was ich mag es zu erreichen ist, um myDF von x zu repartiton, ohne leere Partitionen zu produzieren. Gibt es einen besseren Weg als das?

val numParts = myDF.select($"x").distinct().count.toInt 
myDF.repartition(numParts,$"x") 

(Wenn ich numParts nicht angeben, in repartiton, sind die meisten meiner Partitionen leer (als repartition 200 Partitionen) schafft ...)

+1

Laut http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options werden die 200 Partitionen aufgrund des Standardwerts für die Konfigurationsoption 'spark 'erstellt .sql.shuffle.partitions' – AKSW

+1

Antwort könnte gefunden werden http://stackoverflow.com/questions/41854818/spark-dataframe-repartition-number-of-partition-not-préserved?noredirect=1#comment70893687_41854818 – FaigB

Antwort

2

I der Lösung über df mit Iterieren denken würde, Partition und holen Datensatzanzahl in, um nicht leere Partitionen zu finden.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

Da wir nicht leere Partitionen (nonEmptyPart) erhalten haben, können wir leere Partitionen reinigen, indem Sie coalesce() (check coalesce() vs reparation()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type 

Es kann oder auch nicht die beste sein, aber diese Lösung wird schlurfenden vermeiden, da wir nicht reparation() verwenden


Beispiel Kommentar

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x") 
val nonEmptyPart = sc.longAccumulator("nonEmptyPart") 

df1.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

val finalDf = df1.coalesce(nonEmptyPart.value.toInt) 

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}") 
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}") 
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}") 

Output zu adressieren

nonEmptyPart => 3 
df.rdd.partitions.length => 200 
finalDf.rdd.partitions.length => 3 
+0

'val df = sc.parallelize (Seq (1,1,2,2,3,3)). toDF ("x") .partition (10, $ "x") .coalesce (3) '. Nun wird die Anzahl der Partitionen von 10 auf 3 reduziert. – mrsrinivas

+0

und jetzt 'finalDf.foreachPartition (p => println (p.size))'. Ich bekomme '0 0 6', d. H. 2 Partitionen sind leer, 1 enthält alle Zeilen. Das ist nicht das, was ich wollte (Ich bin Spark 1.6.3) –

+0

Es könnte wegen der Shuffle mit 'coalesce' deaktiviert sein. Versuchen Sie 'repartition' zu verwenden, es wird alle Daten nach dem' HashPartitioner' mischen. Es besteht also die Möglichkeit, dass jede Partition mit Daten gefüllt wird. Wenn Sie wirklich strikt über das Entfernen von leeren Partitionen sind, müssen Sie es möglicherweise iterativ ausführen (** nicht leere Partitionen finden und coalesce/repartition anwenden). – mrsrinivas

Verwandte Themen