2016-11-23 2 views
1

sah ich irgendwo einen Hinweis, wie Sie eine große Datenmenge (etwa Textzeilen) schneller mit dem Multiprocessing-Modul zu bearbeiten, so etwas wie:python3 multiprocessing.Process Ansatz versagt

... (form batch_set = nump batches [= lists of lines to process], batch_set 
    is a list of lists of strings (batches)) 
nump = len(batch_set) 
output = mp.Queue() 
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)] 

for p in processes: 
    p.start() 
for p in processes: 
    p.join() 

results = sorted([output.get() for p in processes]) 
... (do something with the processed outputs, ex print them in order, 
    given that each proc_lines function returns a couple (i, out_batch)) 

Allerdings, wenn ich den Code ausführen mit einer kleinen Anzahl von Zeilen/Batch funktioniert es gut [ex: './code.py -x 4:10' für nump = 4 und tumb = 10 (Zeilen/Batch)] während nach einer bestimmte Anzahl von Zeilen/batch hängt [ex: './code.py -x 4: 4000'] und wenn ich interrupt es sehe ich einen Traceback-Hinweis über ein _wait_for_tstate_lock und das System Threading-Bibliothek. Es scheint, dass der Code nicht den zuletzt angezeigten Code Zeile über ...

Ich biete den Code unten, falls jemand es benötigt, um zu beantworten, warum dies geschieht und wie es zu beheben ist.

#!/usr/bin/env python3 

import sys 
import multiprocessing as mp 


def fabl(numb, nump): 
    ''' 
    Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines 
    '<idx> my line here' with indexes from 1 to (nump x numb). 
    ''' 
    ret = [] 
    idx = 1 
    for _ in range(nump): 
     cb = [] 
     for _ in range(numb): 
      cb.append('%07d my line here' % idx) 
      idx += 1 
     ret.append(cb) 
    return ret 


def proc_lines(i, output, rows_in): 
    ret = [] 
    for row in rows_in: 
     row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part 
     ret.append(row) 

    output.put((i,ret)) 
    return 


def mp_proc(batch_set): 
    'given the batch, disperse it to the number of processes and ret the results' 
    nump = len(batch_set) 
    output = mp.Queue() 
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)] 

    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 

    print('waiting for procs to complete...') 
    results = sorted([output.get() for p in processes]) 
    return results 


def write_set(proc_batch_set, fout): 
    'write p[rocessed]batch_set' 
    for _, out_batch in proc_batch_set: 
     for row in out_batch: 
      fout.write(row) 
    return 


def main(): 
    args = sys.argv 
    if len(args) < 2: 
     print(''' 
    run with args: -x [ NumProc:BatchSize ] 
     (ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...)) 
     ''') 
     sys.exit(0) 

    numb = 10 # suppose we need this number of lines/batch : BatchSize 
    nump = 4 # number of processes to use.    : NumProcs 
    if len(args) > 2 and ':' in args[2]: # use another np:bs 
     nump, numb = map(int, args[2].split(':')) 

    batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines) 
    proc_batch_set = mp_proc(batch_set) 

    with open('out-min', 'wt') as fout: 
     write_set(proc_batch_set, fout) 

    return 

if __name__ == '__main__': 
    main() 

Antwort

1

Die Queue haben eine bestimmte Kapazität und kann voll erhalten, wenn Sie es nicht leer, während die Process ausgeführt werden. Dies blockiert nicht die Ausführung Ihrer Prozesse, aber Sie können nicht an der Process teilnehmen, wenn die put nicht abgeschlossen wurde.

So würde ich ändern nur die mp_proc Funktion, so dass:

def mp_proc(batch_set): 
    'given the batch, disperse it to the number of processes and ret the results' 
    n_process = len(batch_set) 
    output = mp.Queue() 
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) 
       for i in range(process)] 

    for p in processes: 
     p.start() 

    # Empty the queue while the processes are running so there is no 
    # issue with uncomplete `put` operations. 
    results = sorted([output.get() for p in processes]) 

    # Join the process to make sure everything finished correctly 
    for p in processes: 
     p.join() 

    return results 
+0

Also, im Grunde würde es bedeuten, dass ich die Warteschlange am missbrauchen, indem Sie versuchen zu viel, um es hinzuzufügen, um das System der Blöcke. Ich fand diesen Beitrag, der das gleiche sagt: http://StackOverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em so könnte das sein das Problem. Vielen Dank! – vuvu

+0

Ich ersetzte die p.join() - Anweisungen durch einen etwas größeren Block, der nach den p.start() - Befehlen fortwährend aus der Warteschlange "kommt", sobald dort genug Elemente gespeichert sind und auch am Ende, wann Alle Teilprozesse sind beendet, und dies gelingt jetzt auch für sehr große Chargen. – vuvu