2016-07-03 13 views
0

verwenden Wenn Array (Daten) von mehr als 10.000, nicht alle Prozesse sind fertig (siehe letzte Zeile drucken ('konkurrieren')). Bei Array bis zu 2000 Elementen funktioniert dieser Code gut. Ich denke, Problem mit Warteschlange, ohne result_queue.put ([i, j]) alle Prozesse ordnungsgemäß abgeschlossen. Kann mir jemand mit diesem Teil des Codes helfen?Prozess wird nicht beendet, wenn ich Queue

def finder(start,end,proc,result_queue,lock): 
    global data 
    i=start 
    while i<=end: 
     el=data[i] 
     j=-1 
     for el1 in data: 
      j=j+1 
      s1 = SequenceMatcher(None, el, el1) 
      s1_val=s1.ratio() 
      if s1_val>0.9: result_queue.put([i,j]) 
     i=i+1 
    print('end') 


if __name__ == '__main__': 
    multiprocessing.freeze_support() 
    result_queue = multiprocessing.Queue() 
    allProcesses = [] 
    data=r.keys() 
    print(len(data)) 
    parts=8 
    part=int(len(data)/parts) 
    i=0 
    lock = multiprocessing.Lock() 

    while i<parts: 
      p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock)) 
      print('init',part*i, part*i+part,i) 
      allProcesses.append(p) 
      p.daemon = True 
      p.start() 
      i=i+1 
      print('started process',i) 
    i=0 

    for p in allProcesses: 
     p.join() 
     print('complete') 
+0

Gibt es einen Unterschied, wenn laufen sehen, ohne 'p.daemon = Wahr '? –

+0

kein Unterschied (((nur 3 von 8 abgeschlossen –

+0

Es kann eine Größenbeschränkung für die Warteschlange geben. Wenn es keinen Verbraucher für die Warteschlange gibt, wird sie voll und 'queue.put()' wird blockiert ] (http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767) schlägt vor, dass es eine Grenze gibt. –

Antwort

0

Kurze Antwort: die multiprocessing.Manager wird wieder eine

<class 'multiprocessing.managers.AutoProxy[Queue]'> 
: Verwenden multiprocessing.Manager die Queue

m = multiprocessing.Manager() 
    result_queue = m.Queue() 

Etwas ausführlichere Antwort zu erstellen

Instanz, die sicher amo geteilt werden kann ng Arbeiter.

Dies ist die komplette fahrbare Beispiel

import time 
import multiprocessing 


def finder(start,end,proc,result_queue,lock): 
    #global data 


    for i in range(start, end+1): 
     #print (type(result_queue)) 

     result_queue.put((i,)) 




    print('end %s'%proc) 

r = {i:i for i in range(100000)} 


def main(): 

    multiprocessing.freeze_support() 
    allProcesses = [] 
    data=r.keys() 
    print(len(data)) 
    parts=8 
    part=int(len(data)/parts) 
    i=0 
    lock = multiprocessing.Lock() 
    m = multiprocessing.Manager() 
    result_queue = m.Queue() 
    while i<parts: 

      p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock)) 
      print('init',part*i, part*i+part,i) 

      p.daemon = False 
      p.start() 
      i=i+1 
      print('started process',i) 
      allProcesses.append(p) 


    for p in allProcesses: 
     print("join", p) 

     print(p.join()) 
     print('complete') 


if __name__ == '__main__': 
    main() 

Wenn Sie die m.Queue zu multiprocessing.Queue ändern Sie Ihre alte Verhalten

+0

Vielen Dank! Es ist Arbeit –

Verwandte Themen