2017-08-09 2 views
0

dask.compute (...) wird voraussichtlich ein blockierender Anruf sein. Wenn ich dasc.compute verschachtelt habe und der innere I/O (wie dask.dataframe.read_parquet), blockiert der innere dask.compute nicht. Hier ist eine Pseudocodebeispiel:verschachtelt dask.compute nicht blockiert

import dask, distributed 

def outer_func(name): 
    files = find_files_for_name(name) 
    df = inner_func(files).compute() 
    # do work with df 
    return result 

def inner_func(files): 
    tasks = [ dask.dataframe.read_parquet(f) for f in files ] 
    tasks = dask.dataframe.concat(tasks) 
    return tasks 

client = distributed.Client(scheduler_file=...) 
results = dask.compute([ dask.delay(outer_func)(name) for name in names ]) 

Wenn ich zwei Arbeiter mit 8 Prozessen gestartet jeder, wie:

dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1 

, dann würde ich erwarten, höchstens 2 x 8 gleichzeitigen inner_func ausgeführt, da inner_func (Dateien) .compute() sollte blockieren. Was ich jedoch beobachtete, war, dass innerhalb eines Worker-Prozesses, sobald es den Schritt read_parquet startet, eine weitere inner_func (Dateien) auftreten konnte. Am Ende könnte es mehrere interne_func (Dateien) .compute() geben, die ausgeführt werden, und manchmal könnte es zu einem Speichermangel-Fehler kommen.

Ist das erwartete Verhalten? Wenn ja, gibt es eine Möglichkeit, eine innere_func (Dateien) .compute() pro Worker-Prozess zu erzwingen?

+0

Es scheint hier ein wenig Verwirrung zu geben. dask.dataframe erzeugt träge Objekte, und es ist nicht normal, diese innerhalb einer Funktion zu erzeugen/berechnen, die auch verzögert/berechnet wird. Beachten Sie, dass diese Funktion an einen Worker gesendet wird: Wo erwartet Sie die Berechnung? – mdurant

+0

Die Verschachtelung in diesem Beispiel ist ziemlich typisch für einen realen Datenfluss IMHO. Es ist nicht immer machbar/wünschenswert, mit einer verteilten Datenstruktur wie demask DataFrame zu arbeiten, um diese Verschachtelung zu vermeiden. Weil die dask DataFrame API kleiner ist als Pandas und weil es wichtig ist, eine funktionierende Version des seriellen Codes zu behalten. Von dem, was ich sehe, scheint die innere_func in mehreren Threads innerhalb der dask-Worker-Prozess zu laufen, aber ich spezifiziere nur einen Thread pro Arbeiter mit zB: dask-worker --scheduler-datei sched.json --nprocs 3 - -nthreads 1 --local-Verzeichnis/tmp / – user1527390

Antwort

0

Dies scheint nicht der Fall mit Multi-Prozess-Scheduler.

Um den verteilten Scheduler zu verwenden, habe ich die Problemumgehung gefunden, indem Sie die Übermittlung über die Distributed.Client-API über den schrittgesteuerten Auftrag statt über dasask.compute verwenden. Dask.compute ist in einfachen Anwendungsfällen in Ordnung, hat aber offensichtlich keine gute Vorstellung davon, wie viele ausstehende Aufgaben eingeplant werden können, daher sollte das System in diesem Fall überlaufen.

Hier ist der Pseudocode für Lauf einer Sammlung von dask.Delayed Aufgaben mit Pacing:

import distributed as distr 

def paced_compute(tasks, batch_size, client): 
    """ 
    Run delayed tasks, maintaining at most batch_size running at any 
    time. After the first batch is submitted, 
    submit a new job only after an existing one is finished, 
    continue until all tasks are computed and finished. 

    tasks: collection of dask.Delayed 
    client: distributed.Client obj 
    """ 
    results, tasks = [], list(tasks) 
    working_futs = client.compute(tasks[:batch_size]) 
    tasks = tasks[batch_size:] 
    ac = distr.as_completed(working_futs) 
    for fut in ac: 
     res = fut.result() 
     results.append(res) 
     if tasks: 
      job = tasks.pop() 
      ac.add(client.compute(job)) 
    return results 
0

Wenn Sie den dask verteilten Scheduler fragen Arbeit, es liefert den Code der Funktionen auszuführen, und alle Daten, erforderlich für Worker-Funktionen, die sich in verschiedenen Prozessen befinden, möglicherweise auf verschiedenen Computern. Diese Worker-Prozesse führen die Funktionen zuverlässig aus und werden als normaler Python-Code ausgeführt. Der Punkt ist, dass die running-Funktion nicht weiß, dass es auf einem desk-Worker ist - es wird standardmäßig sehen, dass kein globaler dask-verteilter Client eingerichtet ist und was dasask normalerweise für diesen Fall tun würde: Führen Sie alle dask aus Workloads auf dem Standard-Scheduler (dem Threading).

Wenn Sie wirklich vollständige DASC-Rechenoperationen innerhalb von Tasks ausführen müssen und diese den verteilten Scheduler verwenden möchten, der diese Tasks ausführt, müssen Sie worker client verwenden. Ich denke jedoch, dass in Ihrem Fall die Umformulierung des Jobs zur Entfernung der Verschachtelung (etwas wie der Pseudo-Code oben, obwohl dies auch mit der Berechnung funktionieren könnte) wahrscheinlich der einfachere Ansatz ist.