2016-09-15 5 views
0

in funken Scala Rahmen habe ich eine RDD, rdd1, in dem jedes Element ein einzelnes Element einer Matrix darstellt A:Funken: Erhalten Elemente eines RDD auf der Basis der Elemente eines Arrays in einem anderen RDD

val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}

x repräsentiert die Zeile, y repräsentiert die Säule und v stellt den Wert in der Matrix A.

I haben auch eine andere RDD, rdd2, in Form von RDD[index, Array[(x, y)]] wo das Array in jedem Element des Satzes von Elementen der Matrix A, die gespeichert sind in rdd1, erforderlich für die spezifische index repräsentiert in diesem Element darstellt.

Nun, was ich tun muss, ist, die Werte der Matrix A Elemente für jede index erhalten, die Erhaltung aller Daten einschließlich index, (x,y) und v. Was wäre ein guter Ansatz dabei?

Antwort

1

Wenn ich richtig verstehe, kocht Ihre Frage nach unten zu:

val valuesRdd = sc.parallelize(Seq(
//((x, y), v) 
    ((0, 0), 5.5),    
    ((1, 0), 7.7) 
)) 

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)]) 
    (123, Array((0, 0), (1, 0))) 
)) 

Und Sie wollen diese RDDs verschmelzen alle Werte (index, (x, y), v), in diesem Fall (123, (0,0), 5.5) und (123, (1,0), 7.7) zu bekommen?

Sie können dies auf jeden Fall tun join verwenden, da beide RDDs haben eine gemeinsame Spalte (x, y), aber da einer von ihnen tatsächlich ein Array[(x, y)] hat würden Sie zuerst, dass in einer Reihe von Zeilen explodieren müssen:

val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}} 
// Each row exploded into multiple rows (index, (x, y)) 

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)} 
// Each row keyed by the coordinates (x, y) 

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)} 
// Each row keyed by the coordinates (x, y) 

// Because we have common keys, we can join! 
val joined = keyedIndices.join(keyedValues) 
+0

Vielen Dank. Es gibt einen Fehler bezüglich des in der flatMap-Anweisung verwendeten '_':' fehlender Parameter des Typs für erweiterte Funktion ... ' – EdgeRover

+0

Ok. Es funktionierte mit ein wenig Modifikation: 'val explodedIndices = qual.flatMap {case (index, coords: Array [(Lang, Lang)]) => coords.map {case (x, y) => (index, (x, y))}} '. Vielen Dank. – EdgeRover

+0

Großartig! Die Antwort wurde korrigiert, ich hatte es nicht versucht. – spiffman

Verwandte Themen