2016-11-25 1 views
0

Ich habe ein Pandas Dataframe, das Millionen von Zeilen hat und ich muss zeilenweise Operationen ausführen. Da ich eine Multicore-CPU habe, möchte ich diesen Prozess mit Multiprocessing beschleunigen. Ich würde das gerne tun, indem ich den Datenrahmen in gleich große Datenrahmen zerlege und jeden von ihnen in einem separaten Prozess verarbeite. So weit, so gut ... Das Problem ist, dass mein Code im OOP-Stil geschrieben ist und ich Pickle-Fehler mit einem Multiprocess-Pool bekomme. Was ich mache ist, dass ich einen Verweis auf eine Klassenfunktion self.X an den Pool weitergebe. Ich verwende weiterhin Klassenattribute innerhalb von X (nur Lesezugriff). Ich möchte wirklich nicht zum funktionalen Programmierstil zurückkehren ... Ist es also möglich Multiprocessing in einer OOP Umgebung durchzuführen?Multiprocessing mit Klassenfunktionen und Klassenattributen

Antwort

0

Es sollte möglich sein, solange alle Elemente in Ihrer Klasse (die Sie an die Unterprozesse übergeben) einfügbar ist. Das ist die einzige Sache, die Sie sicherstellen müssen. Wenn Elemente in Ihrer Klasse nicht vorhanden sind, können Sie sie nicht an einen Pool übergeben. Selbst wenn Sie nur self.x übergeben, muss alles andere wie self.y ersetzbar sein.

ich meine Pandas Dataframe-Verarbeitung wie folgt aus:

import pandas as pd 
import multiprocessing as mp 
import numpy as np 
import time 


def worker(in_queue, out_queue): 
    for row in iter(in_queue.get, 'STOP'): 
     value = (row[1] * row[2]/row[3]) + row[4] 
     time.sleep(0.1) 
     out_queue.put((row[0], value)) 

if __name__ == "__main__": 
    # fill a DataFrame 
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD')) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue)) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    # iterator over rows 
    it = df.itertuples() 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for i in range(len(df)): 
     while out_queue.empty(): 
      # fill the queue 
      try: 
       row = next(it) 
       in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple 
      except StopIteration: 
       break 
     row_data = out_queue.get() 
     df.loc[row_data[0], "Result"] = row_data[1] 

    # signals for processes stop 
    for p in process: 
     in_queue.put('STOP') 

    # wait for processes to finish 
    for p in process: 
     p.join() 

Auf diese Weise habe ich nicht große Brocken von Datenrahmen passieren und ich habe nicht über picklable Elemente in meiner Klasse zu denken.