Ich erstelle eine multiprocessing.Queue
in Python und multiprocessing.Process
Instanzen zu diesem Queue
hinzufügen.Deadlock in Pythons Multiprocessing bei vorzeitiger Beendigung
Ich möchte einen Funktionsaufruf hinzufügen, der nach jeder job
ausgeführt wird, die überprüft, ob eine bestimmte Aufgabe erfolgreich war. Wenn ja, möchte ich die Queue
leeren und die Ausführung beenden.
Meine Process
Klasse ist:
class Worker(multiprocessing.Process):
def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.queue = queue
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
# Terminate all remaining jobs here
pass
Und mein Queue
ist Setup hier:
class LocalJobServer(object):
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd, timeout=time)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
for wp in workers:
wp.join()
Der Funktionsaufruf mbkit.dispatch.cexectools.cexec()
ist ein Wrapper um subprocess.Popen
und gibt p.stdout
.
In der Worker
Klasse, ich die bedingte wenn ein Job erfolgreich zu überprüfen geschrieben haben, und versucht, die verbleibenden Arbeitsplätze in der Queue
mit einer while
Schleife Entleerung, dh meine Worker.run()
Funktion sah wie folgt aus:
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
break
while not self.queue.empty():
self.queue.get()
Obwohl dies manchmal funktioniert, ist es in der Regel Deadlocks und meine einzige Option ist Ctrl-C
. Mir ist bewusst, dass .empty()
unzuverlässig ist, also meine Frage.
Irgendwelche Ratschläge, wie ich solch eine Frühbeendigungsfunktionalität implementieren kann?
Große Antwort, danke dafür. Eine eher grundlegende Frage, würdest du den 'multiprocessing.Pool' meinem derzeitigen Ansatz empfehlen? – fsimkovic
'multiprocessing.Pool' ist ein guter Ansatz, wenn Sie Ihren Code komplexieren, da er die meiste Kommunikation verwaltet und einige Probleme beim Entwurf Ihrer Anwendung vermeidet. Wenn Sie eine zweite Reihe von Jobs übergeben müssen, können Sie den gleichen 'Pool' verwenden. Dies vermeidet den Zeitpunkt des Startens eines neuen Satzes von "Prozess". Ich bin jedoch kein großer Fan des 'multiprocessing.Pool'-Designs und würde empfehlen, mehr über [' concurrent.futures.ProcessPoolExecutor'] (https://docs.python.org/3/library/concurrent.futures) zu erfahren .html) wenn Sie python3 verwenden, da es robuster ist und die API netter ist! –