Ich habe die folgende Ausnahme in der letzten Zeile den Code unten mit Funken läuftFunken verweigern RDD zip
org.apache.spark.SparkException: nur mit der gleichen Anzahl von Elementen in jeder Partition RDDs zip Kann
val rdd1 = anRDD
val rdd2 = AnotherRDD
println(rdd1.count() == rdd2.count()) // Write true
val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions
val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless)
val rdd2Bis = rdd2.repartition(nparts)
val zipped = rdd1Bis.zip(rdd2Bis)
println(zipped.count())
Was ist los?
PS: es funktioniert, wenn ich RDD1 und RDD2 sammeln, bevor zippen, aber ich brauche sie als RDD zu halten
Wenn Sie die Neuaufteilung überspringen, macht die Zip-Arbeit? –
Nein. Es tut es nicht. – Benjamin
Ich denke, es gibt keine Garantie, dass eine Repartition mit der gleichen Anzahl von Elementen in jeder Partition enden wird, nur die gleiche Anzahl von ähnlich großen Partitionen. Kannst du 'zipPartitions' verwenden? "Zip diese RDD-Partitionen mit einem (oder mehreren) RDD (s) und geben Sie eine neue RDD durch Anwenden einer Funktion auf die gezippten Partitionen. Angenommen, dass alle RDDs * die gleiche Anzahl von Partitionen * haben, aber * nicht * erfordern um die gleiche Anzahl von Elementen in jeder Partition zu haben " –