Ich habe diese RDD, die ich ausgestrahlt habe.Update Broadcast-Variable in einem RDD
test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
return :
{0: 1, 1: 2, 2: 3, 3: 4}
Ich habe eine andere RDD, die eine Liste von Tupel ist:
tuples=sc.parallelize([(0,1),(1,2),(3,2)])
Mein Ziel ist es, das Tupel als Schlüssel für meine Sendung Variable zu verwenden und ihre Werte aktualisieren, indem Sie eine
Also für das Tupel (0,1) wird meine neue Broadcast-Variable sein.
{0: 2, 1: 3, 2: 3, 3: 4}
für das Tupel (1,2)
{0: 2, 1: 4, 2: 4, 3: 4}
für das Tupel (3,2)
{0: 2, 1: 4, 2: 5, 3: 5}
und senden Sie das letzte Update Variable übertragen {0: 2, 1: 4, 2: 5, 3: 5}
I Ich habe versucht, es zu codieren, aber meine Ergebnisse sind nicht gut, für jedes Tupel ist es um eins erhöht, aber nicht berücksichtigt die letzten Ergebnisse.
def modify_broadcast(j,test):
main=j[0]
context=j[1]
test.value[main]=test.value[main]+1
test.value[context]=test.value[context]+1
return test.value
test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
test = sc.broadcast(test.collectAsMap())
print(test.value[0])
coocurence = sc.parallelize([(0,1),(1,2),(3,2)]).map(lambda x: modify_broadcast(x,test))
Thx suresh, wissen Sie, eine andere Lösung, es zu tun? –
Sie können Akkumulatoren (AccumulatorParam-Klasse) versuchen. Sie können ein bisschen zwicken, um es zu erledigen. Sie möchten etwas wie In-Place-Ergänzung. Überprüfen Sie dies, http://www.opensyssoft.com/2015/07/custom-accumulators-in-spark-using.html – Suresh
Ja, es hat funktioniert –