2016-08-08 21 views
5

In den letzten Tagen habe ich versucht zu verstehen, wie Spark-Executoren wissen, wie ein Modul mit einem bestimmten Namen beim Import zu verwenden. Ich arbeite an AWS EMR. Situation: ich initialisieren pyspark auf EMR vonPySpark Distributing Modul Importe

pyspark --master Garn

Dann in pyspark eingeben,

import numpy as np ## notice the naming 

def myfun(x): 
    n = np.random.rand(1) 
    return x*n 

rdd = sc.parallelize([1,2,3,4], 2) 
rdd.map(lambda x: myfun(x)).collect() ## works! 

Mein Verständnis ist, dass, wenn ich numpy as np importieren, ist der Master-Knoten die einzige Knoten importieren und identifizieren numpy bis np. Bei einem EMR-Cluster (2 Arbeiterknoten) sendet das Treiberprogramm jedoch die Funktion an die Arbeiterknoten, um die Funktion für jedes Element in der Liste (für jede Partition) auszuführen, und wenn ich die Kartenfunktion auf der RDD ausführe Das erfolgreiche Ergebnis wird zurückgegeben.

Meine Frage ist: Wie wissen die Arbeiter, dass numpy als np importiert werden soll? Jeder Worker hat numpy bereits installiert, aber ich habe keinen explizit definierten Weg für jeden Knoten definiert, das Modul as np zu importieren.

Bitte beachten Sie die folgende Beitrag von Cloudera Weitere Einzelheiten zu den Abhängigkeiten: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

Unter Complex Abhängigkeit sie haben ein Beispiel (Code), wo die Pandas Modul explizit auf jedem Knoten importiert wird.

Eine Theorie, die ich gehört habe, ist, dass das Treiberprogramm den gesamten Code verteilt, der in der interaktiven pyspark-Shell übergeben wurde. Ich bin skeptisch. Das Beispiel, das ich diese Idee zu begegnen bringen ist, wenn auf dem Master-Knoten I-Typ:

print "hello" 

jeder Arbeiter Knoten ist auch Druck „Hallo“? Ich denke nicht. Aber vielleicht liege ich falsch darin.

Antwort

3

Wenn die Funktion serialisiert wird gibt es eine number of objects is being saved:

  • Code
  • Globals
  • defaults
  • closure
  • dict

, die später verwendet werden kann, um die für eine bestimmte Funktion erforderliche vollständige Umgebung wiederherzustellen.

Da np wird durch die Funktion verwiesen wird sie von ihrem Code extrahiert werden kann:

from pyspark.cloudpickle import CloudPickler 

CloudPickler.extract_code_globals(myfun.__code__) 
## {'np'} 

und Bindung kann aus seinem globals extrahiert werden:

myfun.__globals__['np'] 
## <module 'numpy' from ... 

So serialisierten Verschluss (im weitesten Sinne) erfasst alle Informationen, die zur Wiederherstellung der Umgebung erforderlich sind. Natürlich müssen alle Module, auf die der Verschluss zugreift, auf jedem Arbeitsgerät importiert werden können.

Alles andere ist nur Lesen und Schreiben von Maschinen.

Auf einer Randnotiz sollte der Master-Knoten keinen Python-Code ausführen. Es ist verantwortlich für die Ressourcenzuweisung, auf der kein Anwendungscode ausgeführt wird.

+0

Großartig, danke für die Eingabe. Bedeutet das, dass Code-Teile wie "print" hallo "' bei jedem Arbeiter ausgeführt werden? Oder wird es ignoriert und nur der für die Funktion notwendige Code ausgeführt? – Jon

+1

Nur Code, der von der Schließung erfasst wird, wird tatsächlich auf den Arbeitern ausgeführt. Alles andere wird ignoriert. – zero323