dask.compute (...) wird voraussichtlich ein blockierender Anruf sein. Wenn ich dasc.compute verschachtelt habe und der innere I/O (wie dask.dataframe.read_parquet), blockiert der innere dask.compute nicht. Hier ist eine Pseudocodebeispiel:verschachtelt dask.compute nicht blockiert
import dask, distributed
def outer_func(name):
files = find_files_for_name(name)
df = inner_func(files).compute()
# do work with df
return result
def inner_func(files):
tasks = [ dask.dataframe.read_parquet(f) for f in files ]
tasks = dask.dataframe.concat(tasks)
return tasks
client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])
Wenn ich zwei Arbeiter mit 8 Prozessen gestartet jeder, wie:
dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1
, dann würde ich erwarten, höchstens 2 x 8 gleichzeitigen inner_func ausgeführt, da inner_func (Dateien) .compute() sollte blockieren. Was ich jedoch beobachtete, war, dass innerhalb eines Worker-Prozesses, sobald es den Schritt read_parquet startet, eine weitere inner_func (Dateien) auftreten konnte. Am Ende könnte es mehrere interne_func (Dateien) .compute() geben, die ausgeführt werden, und manchmal könnte es zu einem Speichermangel-Fehler kommen.
Ist das erwartete Verhalten? Wenn ja, gibt es eine Möglichkeit, eine innere_func (Dateien) .compute() pro Worker-Prozess zu erzwingen?
Es scheint hier ein wenig Verwirrung zu geben. dask.dataframe erzeugt träge Objekte, und es ist nicht normal, diese innerhalb einer Funktion zu erzeugen/berechnen, die auch verzögert/berechnet wird. Beachten Sie, dass diese Funktion an einen Worker gesendet wird: Wo erwartet Sie die Berechnung? – mdurant
Die Verschachtelung in diesem Beispiel ist ziemlich typisch für einen realen Datenfluss IMHO. Es ist nicht immer machbar/wünschenswert, mit einer verteilten Datenstruktur wie demask DataFrame zu arbeiten, um diese Verschachtelung zu vermeiden. Weil die dask DataFrame API kleiner ist als Pandas und weil es wichtig ist, eine funktionierende Version des seriellen Codes zu behalten. Von dem, was ich sehe, scheint die innere_func in mehreren Threads innerhalb der dask-Worker-Prozess zu laufen, aber ich spezifiziere nur einen Thread pro Arbeiter mit zB: dask-worker --scheduler-datei sched.json --nprocs 3 - -nthreads 1 --local-Verzeichnis/tmp / – user1527390