2016-07-19 7 views
0

Ich habe eine Liste dataframe_chunk, die Stücke von einem sehr großen Pandas Dataframe enthält.Ich möchte jeden einzelnen Brocken in einen anderen csv schreiben, und dies parallel zu tun. Ich sehe jedoch, dass die Dateien sequenziell geschrieben werden, und ich bin nicht sicher, warum dies der Fall ist. Hier ist der Code:concurrent.futures nicht parallelisieren schreiben

import concurrent.futures as cfu 

def write_chunk_to_file(chunk, fpath): 
    chunk.to_csv(fpath, sep=',', header=False, index=False) 

pool = cfu.ThreadPoolExecutor(N_CORES) 

futures = [] 
for i in range(N_CORES): 
    fpath = '/path_to_files_'+str(i)+'.csv' 
    futures.append(pool.submit(write_chunk_to_file(dataframe_chunk[i], fpath))) 

for f in cfu.as_completed(futures): 
    print("finished at ",time.time()) 

Irgendwelche Hinweise?

Antwort

0

Eine Sache, die in den Python 2.7.x threading docs aber nicht in der 3.x docs angegeben ist, dass Python nicht wahr Parallelität erreichen kann mit der threading Bibliothek - nur ein Thread zu einem Zeitpunkt ausgeführt werden.

Sie sollten versuchen, concurrent.futures mit der ProcessPoolExecutor verwenden, die separate Prozesse für jeden Auftrag verwendet und daher wahre Parallelität auf einer Multi-Core-CPU erreichen können.

aktualisiert

Hier ist das Programm geeignet ist, die multiprocessing Bibliothek zu verwenden, anstatt:

#!/usr/bin/env python3 

from multiprocessing import Process 

import os 
import time 

N_CORES = 8 

def write_chunk_to_file(chunk, fpath): 
    with open(fpath, "w") as f: 
     for x in range(10000000): 
     f.write(str(x)) 

futures = [] 

print("my pid:", os.getpid()) 
input("Hit return to start:") 

start = time.time() 
print("Started at:", start) 

for i in range(N_CORES): 
    fpath = './tmp/file-'+str(i)+'.csv' 
    p = Process(target=write_chunk_to_file, args=(i,fpath)) 
    futures.append(p) 

for p in futures: 
    p.start() 

print("All jobs started.") 

for p in futures: 
    p.join() 

print("All jobs finished at ",time.time()) 

Sie die Jobs mit diesem Shell-Befehl in einem anderen Fenster überwachen:

while true; do clear; pstree 12345; ls -l tmp; sleep 1; done 

(Ersetzen Sie 12345 durch die vom Skript ausgegebene PID.)

+0

yeah probierte das auch, dasselbe :( – elelias

+0

Antwort aktualisiert - Ich hatte mehr Glück beim 'multiprocessing' direkt. – ErikR

Verwandte Themen