sah ich irgendwo einen Hinweis, wie Sie eine große Datenmenge (etwa Textzeilen) schneller mit dem Multiprocessing-Modul zu bearbeiten, so etwas wie:python3 multiprocessing.Process Ansatz versagt
... (form batch_set = nump batches [= lists of lines to process], batch_set
is a list of lists of strings (batches))
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
results = sorted([output.get() for p in processes])
... (do something with the processed outputs, ex print them in order,
given that each proc_lines function returns a couple (i, out_batch))
Allerdings, wenn ich den Code ausführen mit einer kleinen Anzahl von Zeilen/Batch funktioniert es gut [ex: './code.py -x 4:10' für nump = 4 und tumb = 10 (Zeilen/Batch)] während nach einer bestimmte Anzahl von Zeilen/batch hängt [ex: './code.py -x 4: 4000'] und wenn ich interrupt es sehe ich einen Traceback-Hinweis über ein _wait_for_tstate_lock und das System Threading-Bibliothek. Es scheint, dass der Code nicht den zuletzt angezeigten Code Zeile über ...
Ich biete den Code unten, falls jemand es benötigt, um zu beantworten, warum dies geschieht und wie es zu beheben ist.
#!/usr/bin/env python3
import sys
import multiprocessing as mp
def fabl(numb, nump):
'''
Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines
'<idx> my line here' with indexes from 1 to (nump x numb).
'''
ret = []
idx = 1
for _ in range(nump):
cb = []
for _ in range(numb):
cb.append('%07d my line here' % idx)
idx += 1
ret.append(cb)
return ret
def proc_lines(i, output, rows_in):
ret = []
for row in rows_in:
row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part
ret.append(row)
output.put((i,ret))
return
def mp_proc(batch_set):
'given the batch, disperse it to the number of processes and ret the results'
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
print('waiting for procs to complete...')
results = sorted([output.get() for p in processes])
return results
def write_set(proc_batch_set, fout):
'write p[rocessed]batch_set'
for _, out_batch in proc_batch_set:
for row in out_batch:
fout.write(row)
return
def main():
args = sys.argv
if len(args) < 2:
print('''
run with args: -x [ NumProc:BatchSize ]
(ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...))
''')
sys.exit(0)
numb = 10 # suppose we need this number of lines/batch : BatchSize
nump = 4 # number of processes to use. : NumProcs
if len(args) > 2 and ':' in args[2]: # use another np:bs
nump, numb = map(int, args[2].split(':'))
batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines)
proc_batch_set = mp_proc(batch_set)
with open('out-min', 'wt') as fout:
write_set(proc_batch_set, fout)
return
if __name__ == '__main__':
main()
Also, im Grunde würde es bedeuten, dass ich die Warteschlange am missbrauchen, indem Sie versuchen zu viel, um es hinzuzufügen, um das System der Blöcke. Ich fand diesen Beitrag, der das gleiche sagt: http://StackOverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em so könnte das sein das Problem. Vielen Dank! – vuvu
Ich ersetzte die p.join() - Anweisungen durch einen etwas größeren Block, der nach den p.start() - Befehlen fortwährend aus der Warteschlange "kommt", sobald dort genug Elemente gespeichert sind und auch am Ende, wann Alle Teilprozesse sind beendet, und dies gelingt jetzt auch für sehr große Chargen. – vuvu