2016-05-05 11 views
1

I 2 gepaart RDDs haben wie unterRDD-Suche innerhalb einer Transformation

RDD1 Namen als Schlüssel und zipcode als Wert enthält:

RDD1 -> RDD ((ashley, 20171), (yash, 33613), (evan, 40217))

RDD2 enthält Postleitzahl als Schlüssel und einige Zufallszahl als Wert:

RDD2 -> RDD ((20171 , 235523), (33613, 345.345.345), (40189, 44.355.217), (40122, 2.345.235), (40127, 232323424))

Ich brauche die Zipcodes in RDD1 mit den entsprechenden Werten von RDD2 zu ersetzen. So würde der Ausgang sein

RDD3 -> RDD ((ashley, 235523), (yash, 345.345.345), (evan, 232323424))

ich es versuchte dabei die RDD-Lookup-Methode wie unten, aber ich habe Ausnahme sagen, dass RDD Transformationen können nicht innerhalb eines anderen RDD Transformation

val rdd3 = rdd1.map(x => (x._1, rdd2.lookup(x._2)(0))) 
+2

Sie nicht das tun kann. Wenn 'rdd2' klein ist, könnten Sie es auf dem Treiber sammeln und ausstrahlen, dann wäre das, was Sie versuchen, möglich. Sonst müssen Sie wahrscheinlich mit Joins spielen, um zu erreichen, was Sie wollen. – vanza

+0

Meinst du sowas? val zipmap = Karte ("40217" -> "Alabama", "40222" -> "Alaska", "20127" -> "miami", "33613" -> "herndon", "40111" -> "tampa") val broadcastVar = sc.broadcast (zipmap) val benutzerzip_lookup = user_zip_pair.map (x => (x._1, broadcastVar.value (x._2))) – yAsH

+0

Ja, das ist der Kern davon, wenn Sie die Daten senden können . – vanza

Antwort

6
perfomed werden beitreten

Yon einfach 2 RDDs nach Postleitzahl:

rdd1.map({case (name, zipcode) => (zipcode, name)}) 
    .join(rdd2) 
    .map({case (zipcode, (name, number)) => (name, number)}) 
    .collect() 

Beachten Sie, dass nur Datensätze zurückgegeben werden, die übereinstimmende Postleitzahlen in rdd1 und rdd2 enthalten. Wenn Sie einige Standardanzahl, um Datensätze in RDD1 festlegen möchten, die nicht entsprechende zipcode in RDD2 hat, verwenden leftOuterJoin von insted beitreten:

rdd1.map({case (name, zipcode) => (zipcode, name)}) 
    .leftOuterJoin(rdd2) 
    .map({case (zipcode, (name, number)) => (name, number.getOrElse(0))}) 
    .collect() 
+0

@Vvitaliy Danke für die Antwort. Lassen Sie sagen, dass es in rdd2 einen anderen Schlüssel gibt. Wie setzen wir den Wert von other auf die Datensätze in rdd1, die keine Übereinstimmung haben? – yAsH

+0

Hier nehmen Sie nur an, dass der Wert von anderen 0 ist. Was ist, wenn wir den Wert von anderen nicht kennen und es von der RDD2 auswählen? – yAsH

+0

mit 'leftOuterJoin' wählt es den Wert' number' aus rdd2 aus, wenn es in record2 existiert, was die Bedingung 'rdd1.zipcode== rdd2.zipcode' erfüllt, wenn nicht, setzt es den Wert auf 0. wenn es auf irgendeinen Wert gesetzt werden kann Sie wollen, aber wenn Sie es durch eine Logik aus rdd2 auswählen möchten, müssen Sie anspruchsvollere Join durchführen, abhängig von dieser Logik. –