2017-05-15 3 views
0

ich eine Lookup rdd von Größe haben 6000, lookup_rdd: RDD [Zeichenfolge]Funke: access innerhalb eines anderen rdd RDD

a1 a2 a3 a4 a5 .....

und andere rdd, data_rdd: RDD [(String, Iterable [(String, Int)])]: (id, (Artikel, count)), die eindeutigen IDs hat,

(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))

FOREACH Element in lookup_rdd möchte ich überprüfen, ob jedes id dieses Element hat oder nicht, wenn es da ist Ich zähle die Zählung und wenn nicht, gebe ich 0 und in einer Datei speichern.

Was ist der effiziente Weg, dies zu erreichen. Ist Hashing möglich? z.B. Ausgabe ich will, ist:

id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1

ich das versucht:

val headers = lookup_rdd.zipWithIndex().persist() 
val indexing = data_rdd.map{line => 
    val id = line._1 
    val item_cnt_list = line._2 
    val arr = Array.fill[Byte](6000)(0) 
    item_cnt_list.map(c=>(headers.lookup(c._1),c._2)) 
    } 
indexing.collect().foreach(println) 

Ich bekomme die Ausnahme:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations

+0

6000 Gesamt ist ein sehr kleiner Datensatz. Erwägen Sie, auf dem Treiber zu sammeln und senden Sie dann –

Antwort

1

Die schlechte Nachricht ist, dass Sie nicht eine RDD verwenden können in einem anderen.

Die gute Nachricht ist, dass für Ihren Anwendungsfall, vorausgesetzt die 6000 Einträge sind ziemlich klein, gibt es eine ideale Lösung: Sammeln Sie die RDD auf dem Treiber, Broadcast zurück zu jedem Knoten des Clusters und verwenden Sie es innerhalb der andere RDD wie zuvor.

val sc: SparkContext = ??? 
val headers = sc.broadcast(lookup_rdd.zipWithIndex.collect().toMap) 
val indexing = data_rdd.map { case (_, item_cnt_list) => 
    item_cnt_list.map { case (k, v) => (headers.value(k), v) } 
} 
indexing.collect().foreach(println) 
+0

Dank für die Antwort. Haben Sie eine ähnliche Art Situation, aber zusätzlich .. die Nachschlagetabelle innerhalb der Kartenfunktion zu aktualisieren. Und für das nächste Element muss ich nach der aktualisierten Nachschlagetabelle suchen. Ich verstehe, dass wir dies nicht mit broadcast.can machen können. Bitte schlagen Sie vor, wie Sie das angehen können. Selbst ein Link zu der Ressource würde helfen. Danke im Voraus. – Phoenix

+0

Ich glaube, Sie haben eine bessere Änderung beim Erstellen einer Frage für Ihren speziellen Fall, teilen Sie den entsprechenden Code. Schwer zu sagen ohne es. – stefanobaghino

+0

haben eine separate Frage hinzugefügt: können Sie bitte einen Blick darauf werfen. : https: //stackoverflow.com/questions/49125735/loop-through-dataframe-and-update-the-lookup-table-sim- simultan-spark-scala – Phoenix

Verwandte Themen