Angenommen, ich habe eine Funktion, die einige Verarbeitung funktioniert und das Ergebnis zu einem redis Servervon dask Arbeitern Redis Schreiben
r = redis.StrictRedis()
def process(data):
(...do some work...)
r.put(...)
nun eine große Menge von Daten, die ich habe, und ich will dask verwenden parallelisieren der Prozess. Ähnliches wie
from dask.distributed imoprt Client
client = Client()
for x in data:
client.submit(process,x)
Aber ich bekomme KeyError(<function process>)
. Irgendeine Idee?
EDIT
Er arbeitet nach @mrocklin unten beantwortet die Verbindung Initialisierung innerhalb der Funktion zu setzen. Ich gehe davon aus, dass die Verbindung zerstört und neu erschaffen wird, wenn Arbeiter kommen und gehen. Wäre es effizienter, wenn ich meine Funktion neu schreibe, um einen Stapel Daten zu akzeptieren.
def process(batches_data):
r = redis.StrictRedis()
for batch in batches_data:
(...do some work...)
r.put(...)