2015-09-10 6 views
8

Ich implementiere ein Modell in Spark als Python-Klasse, und jedes Mal, wenn ich versuche, eine Klassenmethode zu einer RDD zuzuordnen, schlägt es fehl. Mein eigentlicher Code ist komplizierter, aber diese vereinfachte Version wird am Kern des Problems:Wie verarbeitet man RDDs mit einer Python-Klasse?

class model(object): 
    def __init__(self): 
     self.data = sc.textFile('path/to/data.csv') 
     # other misc setup 
    def run_model(self): 
     self.data = self.data.map(self.transformation_function) 
    def transformation_function(self,row): 
     row = row.split(',') 
     return row[0]+row[1] 

Nun, wenn ich das Modell wie so (zum Beispiel) laufen:

test = model() 
test.run_model() 
test.data.take(10) 

ich die following error:

Ausnahme: Scheint, dass Sie versuchen, SparkContext von einer Übertragungsvariable, einer Aktion oder einer Transforamtion zu verweisen. SparkContext kann nur für den Treiber verwendet werden, nicht für Code, der auf Workern ausgeführt wird. Weitere Informationen finden Sie unter SPARK-5063.

Ich habe mit diesem ein bisschen gespielt, und es scheint zuverlässig jedes Mal, wenn ich versuche, eine Klassenmethode zu einer RDD innerhalb der Klasse zuordnen. Ich habe bestätigt, dass die abgebildete Funktion gut funktioniert, wenn ich außerhalb einer Klassenstruktur implementiere, also hat das Problem definitiv mit der Klasse zu tun. Gibt es eine Möglichkeit, dies zu lösen?

Antwort

10

Problem ist hier ein wenig subtiler als mit geschachtelte RDDs oder Spark-Aktionen innerhalb von Transformationen. Spark erlaubt keinen Zugriff auf die interne Aktion oder Transformation SparkContext.

Auch wenn Sie nicht explizit auf es zugreifen, wird es innerhalb der Schließung referenziert und muss serialisiert und herumgetragen werden. Das bedeutet, dass Ihre transformation-Methode, die self referenziert, auch SparkContext behält, daher der Fehler.

Eine Möglichkeit, dies zu handhaben ist statische Methode zu verwenden:

class model(object): 
    @staticmethod 
    def transformation_function(row): 
     row = row.split(',') 
     return row[0]+row[1] 

    def __init__(self): 
     self.data = sc.textFile('some.csv') 

    def run_model(self): 
     self.data = self.data.map(model.transformation_function) 

bearbeiten:

Wenn Sie Instanzvariablen zugreifen können, wollen Sie so etwas wie dies versuchen:

class model(object): 
    @staticmethod 
    def transformation_function(a_model): 
     delim = a_model.delim 
     def _transformation_function(row): 
      return row.split(delim) 
     return _transformation_function 

    def __init__(self): 
     self.delim = ',' 
     self.data = sc.textFile('some.csv') 

    def run_model(self): 
     self.data = self.data.map(model.transformation_function(self)) 
+0

Perfekt - ich dachte nicht an eine statische Methode. Das einzige Problem ist im vollständigen Code, meine Transformationsfunktion muss auf andere Variablen in der Klasse 'model' zugreifen (nicht RDDs). Ich gehe davon aus, dass der einzige Weg, dies zu erreichen, darin besteht, sie als Argumente an die statische Methode zu übergeben. z.B. 'def transformations_function (row, somevar): return row + somevar' – moustachio

+0

Mit anderen Worten: Gibt es eine Möglichkeit, auf Klassenvariablen (' self.whatever') innerhalb einer statischen Methode zuzugreifen? – moustachio

+0

(Beachten Sie, dass dies keine statischen Variablen sein können - ich würde definitiv auf Instanzvariablen innerhalb der statischen Methode zugreifen wollen) – moustachio

Verwandte Themen