2016-06-28 11 views
5

Ich versuche den folgenden Code, der eine Nummer zu jeder Zeile in einer RDD hinzufügt und eine Liste von RDDs mit PySpark zurückgibt.PySpark Auswertung

from pyspark.context import SparkContext 
file = "file:///home/sree/code/scrap/sample.txt" 
sc = SparkContext('local', 'TestApp') 
data = sc.textFile(file) 
splits = [data.map(lambda p : int(p) + i) for i in range(4)] 
print splits[0].collect() 
print splits[1].collect() 
print splits[2].collect() 

Der Inhalt der Eingabedatei (sample.txt) ist:

1 
2 
3 

I eine Ausgabe wie dies erwartet wurde (die Zahlen in der RDD mit 0 Hinzufügen, 1, 2 bzw.):

[1,2,3] 
[2,3,4] 
[3,4,5] 

während die tatsächliche Ausgabe war:

[4, 5, 6] 
[4, 5, 6] 
[4, 5, 6] 

bedeutet, dass das Verständnis nur den Wert 3 für die Variable i verwendet hat, unabhängig von (4).

Warum passiert dieses Verhalten?

Antwort

3

Es passiert wegen Python späte Bindung und ist nicht (Py) Spark spezifisch. i wird nachgeschlagen, wenn lambda p : int(p) + i verwendet wird, nicht wenn es definiert ist. Normalerweise bedeutet es, wenn es aufgerufen wird, aber in diesem speziellen Kontext ist es, wenn es serialisiert wird, um es an die Arbeiter zu senden.

Sie können wie folgt zum Beispiel etwas tun:

def f(i): 
    def _f(x): 
     try: 
      return int(x) + i 
     except: 
      pass 
    return _f 

data = sc.parallelize(["1", "2", "3"]) 
splits = [data.map(f(i)) for i in range(4)] 
[rdd.collect() for rdd in splits] 
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]] 
+0

ich versucht hatte, vorbei ‚p‘ auf eine einfache externe Funktion und zu einer inneren Funktion (wie die in der Antwort) genannt durch ein Lambda, für Versuch und Irrtum Zwecke. bemerkte das richtige Verhalten, wenn ich das tat: http://pastebin.com/z7E7wGKx Vielen Dank für die Antwort mit dem Grund, warum dies passiert. – srjit

+0

erwähnenswert, dass dies in fast jeder Sprache mit closures/lambdas passiert, auch C# –

2

Dies ist aufgrund der Tatsache, dass Lambda-Ausdrücke auf den i über Referenz beziehen! Es hat nichts mit Funken zu tun. See this

Sie können dies versuchen:

a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)] 
splits = [data.map(a[x]) for x in range(4)] 

oder in einer Zeile

splits = [ 
    data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x]) 
    for x in range(4) 
] 
+1

Wenn Sie 'lambdas' verwenden wollen, gibt es einen einfachen Trick, der Verschachtelung zu vermeiden:' [Lambda x, i = i: i + int (x) für i in Bereich (4)] '. – zero323