2016-03-21 17 views
1

Ich habe die folgende Ausnahme in der letzten Zeile den Code unten mit Funken läuftFunken verweigern RDD zip

org.apache.spark.SparkException: nur mit der gleichen Anzahl von Elementen in jeder Partition RDDs zip Kann

val rdd1 = anRDD 
val rdd2 = AnotherRDD 

println(rdd1.count() == rdd2.count()) // Write true 
val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions 

val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless) 
val rdd2Bis = rdd2.repartition(nparts) 

val zipped = rdd1Bis.zip(rdd2Bis) 
println(zipped.count()) 

Was ist los?

PS: es funktioniert, wenn ich RDD1 und RDD2 sammeln, bevor zippen, aber ich brauche sie als RDD zu halten

+1

Wenn Sie die Neuaufteilung überspringen, macht die Zip-Arbeit? –

+1

Nein. Es tut es nicht. – Benjamin

+1

Ich denke, es gibt keine Garantie, dass eine Repartition mit der gleichen Anzahl von Elementen in jeder Partition enden wird, nur die gleiche Anzahl von ähnlich großen Partitionen. Kannst du 'zipPartitions' verwenden? "Zip diese RDD-Partitionen mit einem (oder mehreren) RDD (s) und geben Sie eine neue RDD durch Anwenden einer Funktion auf die gezippten Partitionen. Angenommen, dass alle RDDs * die gleiche Anzahl von Partitionen * haben, aber * nicht * erfordern um die gleiche Anzahl von Elementen in jeder Partition zu haben " –

Antwort

0

es funktioniert dies überprüfen: Bitte antworten Sie mit dem, was Teil versagt es für Sie

val list1 = List("a","b","c","d") 
val list1 = List("a","b","c","d") 
val rdd1 = sc.parallelize(list1) 
val rdd1 = sc.parallelize(list2) 

Executing ur-Code:

val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions 
val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless) 
val rdd2Bis = rdd2.repartition(nparts) 
val zipped = rdd1Bis.zip(rdd2Bis) 

Ergebnis:

println(zipped.count()) 
4 
zipped.foreach(println) 
(a,a) 
(b,b) 
(c,c) 
(d,d) 
+0

Mit diesem kleinen Datensatz ich vermute, dass Probleme mit der Neupartitionierung nicht angezeigt werden. Kannst du ihm eine (viel) größere Liste geben? –

+0

Dies funktioniert, weil Ihre Listen identisch sind. In meinem Beispiel basieren rdd1 und rdd2 ursprünglich auf derselben RDD, aber eine von ihnen (sagen wir rdd1) wurde mit einer externen Bibliothek transformiert. Am Ende der Transformation hat es immer noch die gleiche Anzahl von Elementen. – Benjamin

+0

Ich würde Ihnen ein Beispiel geben, wo dies nicht funktioniert, wenn Sie wollen, versuchen Sie, die Nummern der Partitionen für rdd1 zu 3 und rdd2 zu 4. – eliasah

2

könnte eine Lösung zip sein mit einem Join:

val rdd1Bis = rdd1.zipWithIndex.map((x) =>(x._2, x._1)) 
val rdd2Bis = rdd2.zipWithIndex.map((x) =>(x._2, x._1)) 
val zipped = rdd1Bis.join(rdd2Bis).map(x => x._2) 
Verwandte Themen