2009-10-08 4 views
15

Ich möchte eine multiprocessing.Queue in eine Liste ausgeben. Für diese Aufgabe habe ich die folgende Funktion geschrieben:Dumping einer Multiprocessing.Queue in eine Liste

import Queue 

def dump_queue(queue): 
    """ 
    Empties all pending items in a queue and returns them in a list. 
    """ 
    result = [] 

    # START DEBUG CODE 
    initial_size = queue.qsize() 
    print("Queue has %s items initially." % initial_size) 
    # END DEBUG CODE 

    while True: 
     try: 
      thing = queue.get(block=False) 
      result.append(thing) 
     except Queue.Empty: 

      # START DEBUG CODE 
      current_size = queue.qsize() 
      total_size = current_size + len(result) 
      print("Dumping complete:") 
      if current_size == initial_size: 
       print("No items were added to the queue.") 
      else: 
       print("%s items were added to the queue." % \ 
         (total_size - initial_size)) 
      print("Extracted %s items from the queue, queue has %s items \ 
      left" % (len(result), current_size)) 
      # END DEBUG CODE 

      return result 

Aber aus irgendeinem Grund funktioniert es nicht.

Beachten Sie die folgende Shell-Sitzung:

>>> import multiprocessing 
>>> q = multiprocessing.Queue() 
>>> for i in range(100): 
...  q.put([range(200) for j in range(100)]) 
... 
>>> q.qsize() 
100 
>>> l=dump_queue(q) 
Queue has 100 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 1 items from the queue, queue has 99 items left 
>>> l=dump_queue(q) 
Queue has 99 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 3 items from the queue, queue has 96 items left 
>>> l=dump_queue(q) 
Queue has 96 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 1 items from the queue, queue has 95 items left 
>>> 

Was hier geschieht? Warum werden nicht alle Gegenstände abgeladen?

Antwort

20

Diese Versuchen:

import Queue 
import time 

def dump_queue(queue): 
    """ 
    Empties all pending items in a queue and returns them in a list. 
    """ 
    result = [] 

    for i in iter(queue.get, 'STOP'): 
     result.append(i) 
    time.sleep(.1) 
    return result 

import multiprocessing 
q = multiprocessing.Queue() 
for i in range(100): 
    q.put([range(200) for j in range(100)]) 
q.put('STOP') 
l=dump_queue(q) 
print len(l) 

Multi Warteschlangen einen internen Puffer aufweisen, die ein feeder Gewinde aufweist, die einen Puffer zieht abarbeiten und spült es mit dem Rohr. Wenn nicht alle Objekte geleert wurden, konnte ich einen Fall sehen, bei dem Empty vorzeitig ausgelöst wurde. Die Verwendung eines Sentinels, um das Ende der Warteschlange anzuzeigen, ist sicher (und zuverlässig). Außerdem ist es besser, iter (get, sentinel) zu verwenden, als auf Empty zu setzen.

Ich mag es nicht, dass es leer wegen der Spülung Timing (Ich habe die time.sleep (.1) hinzugefügt, um einen Kontextwechsel auf den Feeder-Thread, Sie brauchen es möglicherweise nicht, es funktioniert ohne es - es ist eine Angewohnheit, die GIL zu veröffentlichen).

+3

Gute Idee Jesse, aber noch sicherer und zuverlässiger wäre es, eine 'Uuid'-Zeichenkette (oder eher Threading als Multiprocessing, ein spezifisches 'sentinel = object()) als Sentinel anstelle einer generischen Zeichenkette zu verwenden. Selbst dann könnte es Probleme geben, wenn ein anderer Thread zur selben Zeit kommt; der einzige wirklich _safe_ Weg ist der, der auf Interna der Warteschlange angewiesen ist, leider! -) –

+0

Sie haben Recht. Ich ging für die 'schnelle' Lösung mit einem String Sentinel, aber das funktioniert nur in diesem speziellen Fall. Ich beginne mich zu fragen, ob mp.queue brauchen einige Sentinel-Unterstützung in die Queue – jnoller

+0

gebaut Dank für diese Antwort. Ich hatte heute ein ähnliches Problem, das mir diese Antwort geholfen hat. Vollständige Problembeschreibung: http://www.bryceboe.com/2011/01/28/the-python-multiprocessing-queue-and-large-objects/ – bboe

3

In einigen Situationen haben wir bereits alles berechnet und wollen nur die Queue umwandeln.

shared_queue = Queue() 
shared_queue_list = [] 
... 
join() #All process are joined 
while shared_queue.qsize() != 0: 
    shared_queue_list.append(shared_queue.get()) 

Jetzt shared_queue_list hat die Ergebnisse in eine Liste konvertiert.