2014-02-08 8 views
17

Ich bin Python 2.7.3 ausgeführt und ich bemerkte das folgende seltsame Verhalten. Betrachten Sie diese minimal Beispiel:Python Multiprocessing - Prozess hängt bei Join für große Warteschlange

from multiprocessing import Process, Queue 

def foo(qin, qout): 
    while True: 
     bar = qin.get() 
     if bar is None: 
      break 
     qout.put({'bar': bar}) 

if __name__ == '__main__': 
    import sys 

    qin = Queue() 
    qout = Queue() 
    worker = Process(target=foo,args=(qin,qout)) 
    worker.start() 

    for i in range(100000): 
     print i 
     sys.stdout.flush() 
     qin.put(i**2) 

    qin.put(None) 
    worker.join() 

Wenn ich Schleife über 10.000 oder mehr, mein Skript auf worker.join() hängt. Es funktioniert gut, wenn die Schleife nur auf 1.000 geht.

Irgendwelche Ideen?

+0

http://stackoverflow.com/q bekommt/5900985/4013571 –

Antwort

25

Die Warteschlange qout im Subprozess wird voll. Die Daten, die Sie von foo() eingeben, passen nicht in den Puffer der intern verwendeten OS-Pipes, so dass der Subprozess blockiert, der versucht, mehr Daten anzupassen. Aber der Elternprozess liest diese Daten nicht: Es wird einfach blockiert und wartet darauf, dass der Unterprozess beendet wird. Dies ist ein typischer Deadlock.

+2

Es wäre großartig, wenn Sie auch eine Codelösung für das Problem anbieten würden. I.e. wie man den Puffer löscht, damit der Unterprozess nicht blockiert wird. – Matteo

3

Die Größe der Warteschlangen muss begrenzt sein. Ich kann das gleiche Problem reproduzieren (hängend an qin.join()) sogar, indem ich gänzlich entferne. Betrachten Sie die folgende Änderung:

from multiprocessing import Process, Queue 

def foo(qin,qout): 
    while True: 
     bar = qin.get() 
     if bar is None: 
      break 
     #qout.put({'bar':bar}) 

if __name__=='__main__': 
    import sys 

    qin=Queue() 
    qout=Queue() ## POSITION 1 
    for i in range(100): 
     #qout=Queue() ## POSITION 2 
     worker=Process(target=foo,args=(qin,)) 
     worker.start() 
     for j in range(1000): 
      x=i*100+j 
      print x 
      sys.stdout.flush() 
      qin.put(x**2) 

     qin.put(None) 
     worker.join() 

    print 'Done!' 

Dies funktioniert wie sie ist (mit qout.put Linie kommentiert out). Wenn Sie versuchen, alle 100000 Ergebnisse zu speichern, wird qout zu groß: Wenn ich die qout.put({'bar':bar}) in foo auskommentieren und die Definition von qout in POSITION 1 belassen, hängt der Code. Wenn ich jedoch die qout Definition in POSITION 2 verschiebe, wird das Skript beendet.

Kurz gesagt, Sie müssen vorsichtig sein, dass weder qin noch qout zu groß wird. (Siehe auch: Multiprocessing Queue maxsize limit is 32767)

3

Ich hatte das gleiche Problem auf python3, wenn versucht, Strings in eine Warteschlange der Gesamtgröße ca. 5000 Cahs setzen.

In meinem Projekt gab es einen Host-Prozess, der eine Warteschlange erstellt und Subprozess startet, dann verbindet. Nach join Host-Prozess liest aus der Warteschlange. Wenn der Unterprozess zu viele Daten erzeugt, Host-Host auf join. Ich reparierte diese die folgende Funktion für subprocess im Host-Prozess warten:

def yield_from_process(q, p): 
    while p.is_alive(): 
     p.join(timeout=1) 
     while True: 
      try: 
       yield q.get(block=False) 
      except Empty: 
       break 

ich aus der Warteschlange zu lesen, sobald sie es füllt so nie sehr große

Verwandte Themen