2017-03-09 4 views
0

Ich habe diese RDD, die ich ausgestrahlt habe.Update Broadcast-Variable in einem RDD

test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0])) 
return : 
{0: 1, 1: 2, 2: 3, 3: 4} 

Ich habe eine andere RDD, die eine Liste von Tupel ist:

tuples=sc.parallelize([(0,1),(1,2),(3,2)]) 

Mein Ziel ist es, das Tupel als Schlüssel für meine Sendung Variable zu verwenden und ihre Werte aktualisieren, indem Sie eine

Also für das Tupel (0,1) wird meine neue Broadcast-Variable sein.

{0: 2, 1: 3, 2: 3, 3: 4} 

für das Tupel (1,2)

{0: 2, 1: 4, 2: 4, 3: 4} 

für das Tupel (3,2)

{0: 2, 1: 4, 2: 5, 3: 5} 

und senden Sie das letzte Update Variable übertragen {0: 2, 1: 4, 2: 5, 3: 5}

I Ich habe versucht, es zu codieren, aber meine Ergebnisse sind nicht gut, für jedes Tupel ist es um eins erhöht, aber nicht berücksichtigt die letzten Ergebnisse.

def modify_broadcast(j,test): 
    main=j[0] 
    context=j[1] 
    test.value[main]=test.value[main]+1 
    test.value[context]=test.value[context]+1 
    return test.value 

test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0])) 
test = sc.broadcast(test.collectAsMap()) 


print(test.value[0]) 
coocurence = sc.parallelize([(0,1),(1,2),(3,2)]).map(lambda x: modify_broadcast(x,test)) 

Antwort

0

Bei der Übertragung handelt es sich um eine gemeinsam genutzte Variable. Sie können es wie einen Nachschlagewert verwenden und als schreibgeschützt behandeln. Von meinem Lernen wird jeder Arbeiter Knoten die lokale Kopie dieser Variablen und wird seine eigene copy.That aktualisiert nicht auf andere Arbeiter Knoten reflektiert werden, da sie nur einmal an jeden Knoten weitergeleitet werden.

Vom Lernen Spark-Buch:

Ein Broadcast-Variable ist einfach ein Objekt vom Typ spark.broadcast.Broadcast [T], die einen Wert vom Typ T. Wraps Wir diesen Wert Wert durch den Aufruf zugreifen auf dem Broadcast-Objekt in unseren Aufgaben. Der Wert wird nur einmal an jeden Knoten gesendet, wobei ein effizienter BitTorrent-ähnlicher Kommunikationsmechanismus verwendet wird.

Die Verwendung von Broadcast-Variablen ist einfach: 1. Erstellen Sie einen Broadcast [T], indem Sie SparkContext.broadcast für ein Objekt vom Typ T aufrufen. Jeder Typ funktioniert, solange er auch serialisierbar ist. 2. Greifen Sie auf seinen Wert mit der value-Eigenschaft (oder value() -Methode in Java zu). 3. Die Variable wird nur einmal an jeden Knoten gesendet und sollte als schreibgeschützt behandelt werden (Aktualisierungen werden nicht an andere Knoten weitergegeben).

+0

Thx suresh, wissen Sie, eine andere Lösung, es zu tun? –

+0

Sie können Akkumulatoren (AccumulatorParam-Klasse) versuchen. Sie können ein bisschen zwicken, um es zu erledigen. Sie möchten etwas wie In-Place-Ergänzung. Überprüfen Sie dies, http://www.opensyssoft.com/2015/07/custom-accumulators-in-spark-using.html – Suresh

+0

Ja, es hat funktioniert –

Verwandte Themen