2016-02-24 8 views
7

Ich habe eine RDD 'inRDD' der Form RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])] die ein PairRDD(key,value) ist, wo Schlüssel Vector[(Int, Byte)] ist und Wert ist Vector[(Int, Byte)].zu viele Kartenschlüssel aus dem Speicher Ausnahme in Funken verursacht

Für jedes Element in dem Vektor (Int, Byte) des Schlüsselfeldes, und jedes Element in dem Vektor (Int, Byte) Wertfeld Ich mag würde als (Int, Int), (Byte, Byte) einen neuen (Schlüssel, Wert) -Paar in dem Ausgang RDD erhalten.

Das sollte mir eine RDD der Form geben RDD[((Int, Int), (Byte, Byte))].

Zum Beispiel könnte inRDD Inhalt sein wie,

(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2))) 

die

((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2)) 

Ich habe den folgenden Code für das werden würde.

val outRDD = inRDD.flatMap {           
    case (left, right) => 
    for ((ll, li) <- left; (rl, ri) <- right) yield { 
     (ll,rl) -> (li,ri) 
    } 
} 

Es funktioniert, wenn die Vektoren sind klein im inRDD. Aber wenn viele Elemente in den Vektoren sind, bekomme ich out of memory exception. Eine Erhöhung des verfügbaren Speichers auf Funke konnte nur bei kleineren Eingängen auflösen und der Fehler tritt bei noch größeren Eingängen wieder auf. Sieht so aus, als ob ich versuche, eine riesige Struktur im Gedächtnis zu sammeln. Ich kann diesen Code auf keine andere Weise umschreiben.

Ich habe eine ähnliche Logik mit java in hadoop wie folgt implementiert.

Aber wenn ich etwas ähnliches in Funken versuche, bekomme ich verschachtelte RDD-Ausnahmen.

Wie mache ich das effizient mit spark using scala?

+0

Haben Sie bei der Spark-Ebene behandeln versuchen die Anzahl der Partitionen erhöhen? – BlackBear

+0

@BlackBear Ja. Aber das hat nicht geholfen. – CRM

Antwort

2

Nun, wenn cartesianischen Produkt ist die einzige Option, die Sie zumindest ist es ein bisschen mehr faul machen kann:

inRDD.flatMap { case (xs, ys) => 
    xs.toIterator.flatMap(x => ys.toIterator.map(y => (x, y))) 
} 

Sie können auch diese

import org.apache.spark.RangePartitioner 

val indexed = inRDD.zipWithUniqueId.map(_.swap) 
val partitioner = new RangePartitioner(indexed.partitions.size, indexed) 
val partitioned = indexed.partitionBy(partitioner) 

val lefts = partitioned.flatMapValues(_._1) 
val rights = partitioned.flatMapValues(_._2) 

lefts.join(rights).values 
Verwandte Themen