2017-08-17 4 views
1

Wenn Sie Futures auf dask.distributed verwenden, gibt es eine Möglichkeit, zwischen pending Futures zu unterscheiden, die derzeit evaluiert werden immer noch in der Warteschlange?So unterscheiden Sie zwischen wartenden und laufenden Futures (und killen Futures, die zu lange ausgeführt wurden)

Der Grund ist, dass ich eine große Anzahl von Aufgaben (~ 8000) an eine kleinere Gruppe von Arbeitern (100) übergebe, so dass nicht alle Aufgaben sofort bearbeitet werden können. Die Aufgaben umfassen das Aufrufen einer ausführbaren Datei eines Drittanbieters (übersubprocess.check_output), die in seltenen Fällen in eine Endlosschleife übergeht.

Daher möchte ich Futures, die zu lange ausgeführt wurden (mit einem willkürlichen Timeout) abbrechen. Es scheint jedoch keine Möglichkeit zu geben, festzustellen, ob sich eine Zukunft für lange Zeit in einem Zustand befindet, in dem Berechnungen länger dauern als gewöhnlich oder einfach deshalb, weil sie auf die Verfügbarkeit eines Arbeiters warten mussten.

Mein Setup umfasst einen SGE-Cluster, der ein dask-scheduler bzw. dask-worker Job/Job-Array ausführt. Ich habe versucht, einen Timeout direkt in der eingereichten Python-Funktion einstellen, @timeout_decorator.timeout(60, use_signals=False) vom timeout_decorator package mit, bekam aber den folgenden Fehler:

"daemonic processes are not allowed to have children" 

Jede Hilfe sehr geschätzt werden würde.

Antwort

0

Nein, Sie können nicht feststellen, ob eine Aufgabe gestartet wurde oder nicht. Im Allgemeinen empfehlen wir, diese Logik in die Aufgabe selbst einzufügen, wie Sie es mit Ihrem Zeitüberschreitungsdekorator versucht haben.

Ich empfehle stattdessen das timeout= Schlüsselwort zu selbst zu versuchen. Ich vermute, dass dies einfacher ist und eine höhere Wahrscheinlichkeit hat, reibungslos zu funktionieren.

+0

Danke für die schnelle Antwort. Ich führe Python 2 aus, wo "subprocess" das Schlüsselwort "timeout =" fehlt, aber ich konnte den gleichen Effekt mit dem Code in meiner Antwort unten erzielen. Das ist jedoch ein umsetzungsspezifisches Detail, deshalb markiere ich diese Antwort als die richtige. – user3098840

0

Für Benutzer, die Python 2 ausführen, ist das Schlüsselwort timeout= in subprocess.check_output nicht verfügbar.

konnte ich den gewünschten Effekt erhalten, indem subprocess.Popen stattdessen verwenden, die sofort zurückgibt:

import subprocess 
import shlex # useful to split up arguments for subprocess 
import time 

p = subprocess.Popen(shlex.split('/path/to/binary arg1 arg2'), 
        stderr=subprocess.STDOUT) 
for _ in range(60): # wait for up to 60 seconds 
    if p.poll() is not None: 
     break # process completed 
    else: 
     time.sleep(1.0) # give it more time 
if p.poll() is None: # time is up, are we done? 
    try: 
     p.kill() 
    except: 
     raise 
    raise RuntimeError('Binary failed to complete in time.')