2016-12-12 2 views
2

Angenommen, ich habe eine Funktion, die einige Verarbeitung funktioniert und das Ergebnis zu einem redis Servervon dask Arbeitern Redis Schreiben

r = redis.StrictRedis() 

def process(data): 
    (...do some work...) 
    r.put(...) 

nun eine große Menge von Daten, die ich habe, und ich will dask verwenden parallelisieren der Prozess. Ähnliches wie

from dask.distributed imoprt Client 
client = Client() 
for x in data: 
    client.submit(process,x) 

Aber ich bekomme KeyError(<function process>). Irgendeine Idee?

EDIT

Er arbeitet nach @mrocklin unten beantwortet die Verbindung Initialisierung innerhalb der Funktion zu setzen. Ich gehe davon aus, dass die Verbindung zerstört und neu erschaffen wird, wenn Arbeiter kommen und gehen. Wäre es effizienter, wenn ich meine Funktion neu schreibe, um einen Stapel Daten zu akzeptieren.

def process(batches_data): 
    r = redis.StrictRedis() 
    for batch in batches_data: 
     (...do some work...) 
     r.put(...) 

Antwort

2

Meine erste Vermutung ist, dass Ihr Objekt r nicht gut serialisiert. Dies ist ziemlich typisch, da Objekte mit Live-Verbindungen sich oft weigern, serialisiert zu werden (aus gutem Grund).

Stattdessen könnten Sie versuchen, die Verbindung innerhalb der Funktion zur Gründung

def process(data): 
    r = redis.StrictRedis() 
    ... do some work 
    r.put(...) 

Zusätzlich empfehle ich Ihnen, die Futures von submit hergestellt festhalten. Sonst wird Dask davon ausgehen, dass Sie nicht mehr über diese Aufgaben kümmern und entscheiden, dass es ihnen

futures = [client.submit(process, x) for x in L] 
wait(futures) 

ignorieren Wenn dieses Problem nicht lösen dann Ihre ursprüngliche Frage mit einer vollständigeren Ausnahme und Rückverfolgungs empfehle ich bearbeite.