Ich habe eine RDD 'inRDD'
der Form RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])]
die ein PairRDD(key,value)
ist, wo Schlüssel Vector[(Int, Byte)]
ist und Wert ist Vector[(Int, Byte)]
.zu viele Kartenschlüssel aus dem Speicher Ausnahme in Funken verursacht
Für jedes Element in dem Vektor (Int, Byte)
des Schlüsselfeldes, und jedes Element in dem Vektor (Int, Byte)
Wertfeld Ich mag würde als (Int, Int), (Byte, Byte)
einen neuen (Schlüssel, Wert) -Paar in dem Ausgang RDD erhalten.
Das sollte mir eine RDD der Form geben RDD[((Int, Int), (Byte, Byte))]
.
Zum Beispiel könnte inRDD
Inhalt sein wie,
(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2)))
die
((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2))
Ich habe den folgenden Code für das werden würde.
val outRDD = inRDD.flatMap {
case (left, right) =>
for ((ll, li) <- left; (rl, ri) <- right) yield {
(ll,rl) -> (li,ri)
}
}
Es funktioniert, wenn die Vektoren sind klein im inRDD
. Aber wenn viele Elemente in den Vektoren sind, bekomme ich out of memory exception
. Eine Erhöhung des verfügbaren Speichers auf Funke konnte nur bei kleineren Eingängen auflösen und der Fehler tritt bei noch größeren Eingängen wieder auf. Sieht so aus, als ob ich versuche, eine riesige Struktur im Gedächtnis zu sammeln. Ich kann diesen Code auf keine andere Weise umschreiben.
Ich habe eine ähnliche Logik mit java in hadoop
wie folgt implementiert.
Aber wenn ich etwas ähnliches in Funken versuche, bekomme ich verschachtelte RDD-Ausnahmen.
Wie mache ich das effizient mit spark using scala
?
Haben Sie bei der Spark-Ebene behandeln versuchen die Anzahl der Partitionen erhöhen? – BlackBear
@BlackBear Ja. Aber das hat nicht geholfen. – CRM