2017-12-11 1 views
0

Wenn ich ein Queue-Objekt an eine Funktion weiterleite, die mit Pool.apply_async aufgerufen wird, schlägt die Funktion fehl, wie durch ApplyResult.successful() und das Fehlen der Druckausgabe angezeigt wird. Der Rumpf der Funktion scheint in diesem Fall überhaupt nicht zu laufen.Warum führt das Übergeben einer Warteschlange dazu, dass diese Funktion fehlschlägt, wenn sie mit Pool.apply_async aufgerufen wird?

Ich hatte geplant, die Warteschlange zu verwenden, um die Sammlung von Ergebnissen von separaten Prozessen wie suggested by the multiprocessing documentation zu synchronisieren, aber die Warteschlange verursacht Fehler, auch wenn sie nicht tatsächlich in der Funktion verwendet wird.

from multiprocessing import Pool, Queue 
import time 
from random import randint 

def sample_function(name, results): 
    delay_ms = randint(1, 10) 
    print ("{} starting with delay {:d}".format(name, int(delay_ms))) 
    time.sleep(delay_ms) 
    # results argument is unused! 
    #results.put("{} result".format(name)) 
    print ("{} ending".format(name)) 

resultsQueue = Queue() 
jobs = ['one','two','three','four', 'five', 'six'] 

pool = Pool(processes=4) 
# fails 
jobStatuses = [pool.apply_async(sample_function, args=(job, resultsQueue)) for job in jobs] 
# succeeds 
#jobStatuses = [pool.apply_async(sample_function, args=(job,'works with string argument')) for job in jobs] 

pool.close() 
print('closing: no more tasks') 
pool.join() 

for status in jobStatuses: 
    print (status.ready(), status.successful()) 

while not resultsQueue.empty(): 
    print(resultsQueue.get()) 
print('All finished') 

Ich kann die gleiche Funktion aufrufen, ohne Pool.apply_async und es wird gelingen: sample_function('test without pool', resultsQueue). Ich kann auch die Pool.apply_async Funktion mit einem String aufrufen, und es wird gelingen.

Antwort

3

Es gibt einen RuntimeError, der in jedem apply_async Anruf mit einem multiprocessing.Queue auftritt, das zum Schweigen gebracht wird.
Ändern Sie Ihren Code ein wenig konnte ich es verfolgen:

for status in job_statuses: 
    print(status.__dict__) 

Ausgänge:

{ '_value': Runtime ('Queue-Objekte nur zwischen Prozessen durch Vererbung geteilt werden soll ‘), '_success': false '_callback': Keine, '_cache': {}, '_job': 0, '_error_callback': Keine, '_event':}

x6 mal.

Mit einem , die unter den Prozessen geteilt werden kann, löst dies.

Verwandte Themen