2016-11-29 2 views
2

Warum liefert der Zähler, den ich mit pyspark unten geschrieben habe, nicht immer das richtige Ergebnis, hängt es mit dem globalen Zähler zusammen?Globaler Zähler in pyspark

def increment_counter(): 
    global counter 
    counter += 1 

def get_number_of_element(rdd): 
    global counter 
    counter = 0 
    rdd.foreach(lambda x:increment_counter()) 
    return counter 

Antwort

4

Ihre globale Variable ist nur auf dem Treiberknoten definiert, was bedeutet, dass sie funktioniert, bis Sie auf localhost ausgeführt werden. Sobald Sie Ihren Job an mehrere Prozesse verteilen, haben sie keinen Zugriff auf die Variable counter und erstellen nur eine neue in ihrem eigenen Prozess. Das Endergebnis enthält also nur die im Treiberprozess vorgenommenen Schritte.

Was Sie suchen, ist eine ziemlich häufige Verwendung obwohl, und wird von der Akku-Funktion von Spark abgedeckt. Akkumulatoren werden am Ende des Prozesses verteilt und gesammelt, so dass die Summen die Inkremente aller Knoten statt nur des Treiberknotens enthalten.

Accumulators - Spark Programming Guide

+0

Großartig! Ich danke dir sehr! – xxx222