2016-03-25 11 views
2

in einem funkenProgramm zuzuweisen I 2 RDDs zuerst initialisiert und verwenden dann die 2. RDD in einer while-Schleife wie folgt:Wie ein festen UUID Wert in RDD

var nodes = sc.parallelize(ArrayBuffer(1, 2, 3, 4, 5)) 
var node_GroupIDs = nodes.map(x=>(x, UUID.randomUUID())) 
var i = 0 

while (i < 10) { 
    node_GroupIDs.foreach(println) 
    i += 1 
} 

I in verschiedenen Iterationen, die für einen bestimmten Knoten gefunden ID Die entsprechende Gruppen-ID hat einen anderen Wert. Es scheint, dass in jeder Iteration die RDD erneut initialisiert worden war. Wie kann man in diesem Fall einem Knoten einen festen UUID-Wert zuweisen?

Antwort

2

Ich würde vorschlagen, dass Sie die RDD zwischenspeichern, um immer das gleiche Ergebnis zu haben. In der Tat wird die RDD jedesmal neu berechnet, wenn eine Aktion darauf steht: d.h. jedes Mal, wenn Sie das foreach-Verfahren aufrufen, wird jeder Schritt von Grund auf neu berechnet, sowohl der parallelize als auch der map.

Wenn Sie die RDD nach dem map statt cache, es berechnet wird, einmal, das erste Mal die foreach Aktion aufrufen. Dann wird das Ergebnis im Speicher gespeichert und die anderen foreach Aktionen werden das Ergebnis im Speicher zwischengespeichert und Sie erhalten immer die gleichen Ergebnisse.

Wie von @ David Griffin darauf hingewiesen, würde dies das Problem nicht lösen, wenn ein Executor abstürzt. Um diesen Fall zu decken, ist es notwendig, checkpoint es, nachdem Sie es zwischengespeichert haben (siehe https://issues.apache.org/jira/browse/SPARK-8582).

Somit ist der richtige Weg, um das Problem zu lösen, sollte sein:

node_GroupIDs.cache() 
node_GroupIDs.checkpoint() 
+2

Wenn Sie es zwischenspeichern, ist es nicht immer noch möglich, es neu berechnet wird, sagen, wenn Sie einen Testamentsvollstrecker zu verlieren? –

+0

Caching der RDD scheint nicht die beste Option. @DavidGriffin hat Recht. Wenn Sie einen Executor verlieren, wird die Map erneut ausgeführt und eine neue UUID generiert. Mir ist immer noch nicht klar, warum 10 RDDs mit derselben UUID gedruckt werden sollen. Ich nehme an, das ist Teil eines größeren Codes. Kannst du erklären, was deine Absicht ist? – PinoSan

+0

Was ist in Kombination mit 'RDD.checkpoint()'? –