2014-11-16 12 views
6

Ich versuche, Broadcast-Variablen innerhalb von Python-Methoden zu erstellen (versuchen, einige Hilfsprogramme zu abstrahieren, die ich verteilte Operationen erstellen). Ich kann jedoch nicht auf die Broadcast-Variablen innerhalb der Spark-Mitarbeiter zugreifen.PySpark Broadcast-Variablen von lokalen Funktionen

Lassen Sie uns sagen, ich habe dieses Setup:

def main(): 
    sc = SparkContext() 
    SomeMethod(sc) 

def SomeMethod(sc): 
    someValue = rand() 
    V = sc.broadcast(someValue) 
    A = sc.parallelize().map(worker) 

def worker(element): 
    element *= V.value ### NameError: global name 'V' is not defined ### 

Allerdings, wenn ich die SomeMethod() Mittelsmann statt beseitigen, es funktioniert gut.

def main(): 
    sc = SparkContext() 
    someValue = rand() 
    V = sc.broadcast(someValue) 
    A = sc.parallelize().map(worker) 

def worker(element): 
    element *= V.value # works just fine 

Ich würde lieber nicht alle meine Spark-Logik in die Hauptmethode setzen, wenn ich kann. Gibt es eine Möglichkeit, Variablen aus lokalen Funktionen zu senden und sie für die Mitarbeiter von Spark global sichtbar zu machen?

Alternativ, was wäre ein gutes Entwurfsmuster für diese Art von Situation - z. B. möchte ich eine Methode speziell für Spark schreiben, die in sich abgeschlossen ist und eine bestimmte Funktion ausführt, die ich gerne wiederverwenden möchte?

Antwort

7

Ich bin nicht sicher, ob ich völlig die Frage verstanden, aber wenn Sie das V Objekt innerhalb des Arbeiter-Funktion müssen Sie dann sollten Sie es auf jeden Fall als Parameter übergeben, sonst ist das Verfahren nicht wirklich in sich geschlossene:

def worker(V, element): 
    element *= V.value 

Jetzt, um sie in den Kartenfunktionen verwenden Sie einen Teil verwenden müssen, so dass nur die Karte eine Funktion 1 Parameter sieht:

from functools import partial 

def SomeMethod(sc): 
    someValue = rand() 
    V = sc.broadcast(someValue) 
    A = sc.parallelize().map(partial(worker, V=V)) 
+0

gibt es wie dies irgendwelche Auswirkungen auf der Leistung Broadcast Variablen um nebenbei? Nehmen wir zum Beispiel an, dass ich auf eine Broadcast-Variable in einer map() -Funktion über Zehntausende (oder mehr) von Zeilen angewiesen bin. so etwas wie 'def-Transformation (row): return broadcast_variable.value [row [0]]' , die dann in einer 'Karte()' Funktion wie 'rdd.map (Transformation) verwendet wird, ' – iralls

+0

Dank dieser Lösung hat mir geholfen, die Verwendung von Global für die Broadcast-Variable zu vermeiden. Bitte beachten Sie, dass Sie die Reihenfolge der Parameter der worker-Methode so ändern sollten, dass der Parameter 'element' (der vom Spark-Framework aufgefüllt wird) der erste ist. Sonst wird es nicht funktionieren. –

Verwandte Themen