2016-10-03 3 views
1

Ich habe komplizierte Funktion, die ich über einen Datensatz in Spark mit der Kartenfunktion ausführen. Es ist in einem anderen Python-Modul. Wenn die Map aufgerufen wird, haben die Executor-Knoten diesen Code nicht und die Map-Funktion schlägt fehl.Wie kann ich Spark dazu bringen, Code in einem anderen Modul zu sehen?

s_cobDates = getCobDates() #returns a list of dates 
sb_dataset = sc.broadcast(dataset) #fyi - it is not trivial to slice this into chunks per date 

def sparkInnerLoop(n_cobDate): 
    n_dataset = sb_dataset.value 
    import someOtherModule 
    return someOtherModule.myComplicatedCalc(n_dataset) 

results = s_cobDates.map(sparkInnerLoop).collect() 

Spark schlägt fehl, da myOtherModule nicht importiert werden kann.

Bisher habe ich es geschafft, indem ich ein Python-Paket erstellt habe, das someOtherModule enthält und das im Vorfeld meiner Spark-Jobs an den Cluster weiterleitet, aber das macht kein Rapid-Prototyping.

Wie bekomme ich Funken, um den vollständigen Code an die Executor-Knoten zu senden, ohne den gesamten Code in "sparkInnerLoop" zu inlinern? Dieser Code wird an anderer Stelle in meiner Lösung verwendet, und ich möchte keine Code-Duplizierung durchführen.

Ich verwende einen Cluster mit acht Knoten im Standalone-Modus, v 1.6.2, und der Treiber läuft auf meiner Workstation in pycharm.

Antwort

0

Nun funktioniert die obige Antwort, es fällt, wenn Ihre Module Teil eines Pakets sind. Stattdessen ist es möglich, die Module zu komprimieren und dann die Zip-Datei zu Ihrem Spark-Kontext hinzuzufügen, und dann haben sie den richtigen Paketnamen.

def ziplib(): 
    libpath = os.path.dirname(__file__) # this should point to your packages directory 
    zippath = r'c:\Temp\mylib-' + randstr.randstr(6) + '.zip' 
    zippath = os.path.abspath(zippath) 
    zf = zipfile.PyZipFile(zippath, mode='w') 
    try: 
     zf.debug = 3 # making it verbose, good for debugging 
     zf.writepy(libpath) 
     return zippath # return path to generated zip archive 
    finally: 
     zf.close() 

sc = SparkContext(conf=conf) 

zip_path = ziplib() # generate zip archive containing your lib 
zip_path = pathlib.Path(zip_path).as_uri() 
sc.addPyFile(zip_path) # add the entire archive to SparkContext 
+0

Das funktionierte sehr gut danke – ThatDataGuy

Verwandte Themen