Wenn ich ein Queue-Objekt an eine Funktion weiterleite, die mit Pool.apply_async aufgerufen wird, schlägt die Funktion fehl, wie durch ApplyResult.successful()
und das Fehlen der Druckausgabe angezeigt wird. Der Rumpf der Funktion scheint in diesem Fall überhaupt nicht zu laufen.Warum führt das Übergeben einer Warteschlange dazu, dass diese Funktion fehlschlägt, wenn sie mit Pool.apply_async aufgerufen wird?
Ich hatte geplant, die Warteschlange zu verwenden, um die Sammlung von Ergebnissen von separaten Prozessen wie suggested by the multiprocessing documentation zu synchronisieren, aber die Warteschlange verursacht Fehler, auch wenn sie nicht tatsächlich in der Funktion verwendet wird.
from multiprocessing import Pool, Queue
import time
from random import randint
def sample_function(name, results):
delay_ms = randint(1, 10)
print ("{} starting with delay {:d}".format(name, int(delay_ms)))
time.sleep(delay_ms)
# results argument is unused!
#results.put("{} result".format(name))
print ("{} ending".format(name))
resultsQueue = Queue()
jobs = ['one','two','three','four', 'five', 'six']
pool = Pool(processes=4)
# fails
jobStatuses = [pool.apply_async(sample_function, args=(job, resultsQueue)) for job in jobs]
# succeeds
#jobStatuses = [pool.apply_async(sample_function, args=(job,'works with string argument')) for job in jobs]
pool.close()
print('closing: no more tasks')
pool.join()
for status in jobStatuses:
print (status.ready(), status.successful())
while not resultsQueue.empty():
print(resultsQueue.get())
print('All finished')
Ich kann die gleiche Funktion aufrufen, ohne Pool.apply_async
und es wird gelingen: sample_function('test without pool', resultsQueue)
. Ich kann auch die Pool.apply_async
Funktion mit einem String aufrufen, und es wird gelingen.