2017-05-10 2 views
1

Ich erstelle eine multiprocessing.Queue in Python und multiprocessing.Process Instanzen zu diesem Queue hinzufügen.Deadlock in Pythons Multiprocessing bei vorzeitiger Beendigung

Ich möchte einen Funktionsaufruf hinzufügen, der nach jeder job ausgeführt wird, die überprüft, ob eine bestimmte Aufgabe erfolgreich war. Wenn ja, möchte ich die Queue leeren und die Ausführung beenden.

Meine Process Klasse ist:

class Worker(multiprocessing.Process): 

    def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False): 
     super(Worker, self).__init__() 
     self.check_success = check_success 
     self.directory = directory 
     self.permit_nonzero = permit_nonzero 
     self.queue = queue 

    def run(self): 
     for job in iter(self.queue.get, None): 
      stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero) 
      with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out: 
       f_out.write(stdout) 
      if callable(self.check_success) and self.check_success(job): 
       # Terminate all remaining jobs here 
       pass 

Und mein Queue ist Setup hier:

class LocalJobServer(object): 

    @staticmethod 
    def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs): 
     if check_success and not callable(check_success): 
      msg = "check_success option requires a callable function/object: {0}".format(check_success) 
      raise ValueError(msg) 

     # Create a new queue 
     queue = multiprocessing.Queue() 
     # Create workers equivalent to the number of jobs 
     workers = [] 
     for _ in range(nproc): 
      wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero) 
      wp.start() 
      workers.append(wp) 
     # Add each command to the queue 
     for cmd in command: 
      queue.put(cmd, timeout=time) 
     # Stop workers from exiting without completion 
     for _ in range(nproc): 
      queue.put(None) 
     for wp in workers: 
      wp.join() 

Der Funktionsaufruf mbkit.dispatch.cexectools.cexec() ist ein Wrapper um subprocess.Popen und gibt p.stdout.

In der Worker Klasse, ich die bedingte wenn ein Job erfolgreich zu überprüfen geschrieben haben, und versucht, die verbleibenden Arbeitsplätze in der Queue mit einer while Schleife Entleerung, dh meine Worker.run() Funktion sah wie folgt aus:

def run(self): 
    for job in iter(self.queue.get, None): 
     stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero) 
     with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out: 
      f_out.write(stdout) 
     if callable(self.check_success) and self.check_success(job): 
      break 
    while not self.queue.empty(): 
     self.queue.get() 

Obwohl dies manchmal funktioniert, ist es in der Regel Deadlocks und meine einzige Option ist Ctrl-C. Mir ist bewusst, dass .empty() unzuverlässig ist, also meine Frage.

Irgendwelche Ratschläge, wie ich solch eine Frühbeendigungsfunktionalität implementieren kann?

Antwort

1

Sie haben hier keinen Deadlock. Es ist nur mit dem Verhalten von multiprocessing.Queue verknüpft, wie die get Methode standardmäßig blockiert. Wenn Sie also get in einer leeren Warteschlange anrufen, wartet der Anruf, bis das nächste Element bereit ist. Sie können sehen, dass einige Ihrer Mitarbeiter, da wird abgewürgt, wenn Sie Ihre Schleife verwenden while not self.queue.empty() es zu leeren, Sie alle None Sentinel entfernen und einige Ihrer Mitarbeiter auf dem leeren Queue blockiert, wie in diesem Code:

from multiprocessing import Queue 
q = Queue() 
for e in iter(q.get, None): 
    print(e) 

Um benachrichtigt zu werden, wenn die Warteschlange leer ist, müssen Sie einen nicht blockierenden Anruf verwenden. Sie können zum Beispiel q.get_nowait verwenden oder ein Timeout in q.get(timeout=1) verwenden. Beide werfen eine multiprocessing.queues.Empty Ausnahme, wenn die Warteschlange leer ist.So sollten Sie Ihre Workerfor job in iter(...): Schleife durch so etwas wie ersetzen:

while not queue.empty(): 
    try: 
     job = queue.get(timeout=.1) 
    except multiprocessing.queues.Empty: 
     continue 
    # Do stuff with your job 

Wenn Sie nicht an irgendeiner Stelle geklebt werden wollen.

Für den Synchronisierungsteil würde ich die Verwendung eines Synchronisationsprimitivs wie multiprocessing.Condition oder multiprocessing.Event empfehlen. Dies ist sauberer als der Wert sind sie Design für diesen Zweck. So etwas sollte helfen

def run(self): 
    while not queue.empty(): 
     try: 
      job = queue.get(timeout=.1) 
     except multiprocessing.queues.Empty: 
      continue 
     if self.event.is_set(): 
      continue 
     stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero) 
     with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out: 
      f_out.write(stdout) 
     if callable(self.check_success) and self.check_success(job): 
      self.event.set() 
    print("Worker {} terminated cleanly".format(self.name)) 

mit event = multiprocessing.Event().

Beachten Sie, dass es auch möglich ist, einen multiprocessing.Pool zu verwenden, um zu vermeiden, mit der Warteschlange und den Arbeitern umzugehen. Da Sie jedoch ein Synchronisationsprimitiv benötigen, könnte es etwas komplizierter sein, es einzurichten. So etwas sollte funktionieren:

def worker(job, success, check_success=None, directory=None, permit_nonzero=False): 
     if sucess.is_set(): 
      return False 
     stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero) 
     with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out: 
      f_out.write(stdout) 
     if callable(self.check_success) and self.check_success(job): 
      success.set() 
     return True 

# ...... 
# In the class LocalJobServer 
# ..... 

def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False): 

    mgr = multiprocessing.Manager() 
    success = mgr.Event() 

    pool = multiprocessing.Pool(nproc) 
    run_args = [(cmd, success, check_success, directory, permit_nonzero)] 
    result = pool.starmap(worker, run_args) 

    pool.close() 
    pool.join() 

Hinweis hier, dass ich einen Manager verwenden, da Sie nicht multiprocessing.Event direkt als Argumente übergeben können. Sie könnten auch die Argumente initializer und initargs von Pool verwenden, um das globale Ereignis success in jedem Arbeiter zu initiieren und vermeiden Sie es, sich auf die Manager zu verlassen, aber es ist etwas komplizierter.

+0

Große Antwort, danke dafür. Eine eher grundlegende Frage, würdest du den 'multiprocessing.Pool' meinem derzeitigen Ansatz empfehlen? – fsimkovic

+1

'multiprocessing.Pool' ist ein guter Ansatz, wenn Sie Ihren Code komplexieren, da er die meiste Kommunikation verwaltet und einige Probleme beim Entwurf Ihrer Anwendung vermeidet. Wenn Sie eine zweite Reihe von Jobs übergeben müssen, können Sie den gleichen 'Pool' verwenden. Dies vermeidet den Zeitpunkt des Startens eines neuen Satzes von "Prozess". Ich bin jedoch kein großer Fan des 'multiprocessing.Pool'-Designs und würde empfehlen, mehr über [' concurrent.futures.ProcessPoolExecutor'] (https://docs.python.org/3/library/concurrent.futures) zu erfahren .html) wenn Sie python3 verwenden, da es robuster ist und die API netter ist! –

0

Dies ist möglicherweise nicht die optimale Lösung, und anderer Vorschlag ist sehr geschätzt, aber ich schaffte es, das Problem als solches zu lösen:

class Worker(multiprocessing.Process): 
    """Simple manual worker class to execute jobs in the queue""" 

    def __init__(self, queue, success, check_success=None, directory=None, permit_nonzero=False): 
     super(Worker, self).__init__() 
     self.check_success = check_success 
     self.directory = directory 
     self.permit_nonzero = permit_nonzero 
     self.success = success 
     self.queue = queue 

    def run(self): 
     """Method representing the process's activity""" 
     for job in iter(self.queue.get, None): 
      if self.success.value: 
       continue 
      stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero) 
      with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out: 
       f_out.write(stdout) 
      if callable(self.check_success) and self.check_success(job): 
       self.success.value = int(True) 
      time.sleep(1) 


class LocalJobServer(object): 
    """A local server to execute jobs via the multiprocessing module""" 

    @staticmethod 
    def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs): 
     if check_success and not callable(check_success): 
      msg = "check_success option requires a callable function/object: {0}".format(check_success) 
      raise ValueError(msg) 

     # Create a new queue 
     queue = multiprocessing.Queue() 
     success = multiprocessing.Value('i', int(False)) 
     # Create workers equivalent to the number of jobs 
     workers = [] 
     for _ in range(nproc): 
      wp = Worker(queue, success, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero) 
      wp.start() 
      workers.append(wp) 
     # Add each command to the queue 
     for cmd in command: 
      queue.put(cmd) 
     # Stop workers from exiting without completion 
     for _ in range(nproc): 
      queue.put(None) 
     # Start the workers 
     for wp in workers: 
      wp.join(time) 

Im Grunde bin ich ein Value Erstellen und Bereitstellen, dass zu jedem Process . Sobald ein Job als erfolgreich markiert wurde, wird diese Variable aktualisiert. Jede Process prüft in if self.success.value: continue, ob wir einen Erfolg haben und wenn ja, iteriert nur über die verbleibenden Aufträge in Queue bis leer.

Der Aufruf time.sleep(1) ist erforderlich, um mögliche Synchronisationsverzögerungen zwischen den Prozessen zu berücksichtigen. Dies ist sicherlich nicht der effizienteste Ansatz, aber es funktioniert.

+0

Dies ist der mögliche Schuldige Ihrer 'Deadlock':' für einen Job in iter (self.queue.get, keine): ' – stovfl

+0

@stovfl Vielen Dank für Ihre Antwort, aber die hier gepostete Antwort funktioniert gut. Ich glaube, das Problem mit 'while not self.queue.empty()' und 'self.queue.get()', dh die 'queue' war nicht leer, ging in die Schleife, sondern auf' get() ' es war leer und festgefahren. – fsimkovic

+0

Habe ich 'while ...' nicht gesehen? BTW: Ist Ihnen "Modul Multiprocessing.Pool" bekannt? Sieht so aus, als hättest du 'Pool' neu implementiert. – stovfl

Verwandte Themen