1

Ich wollte df.corr() mit multiprocessing Modul in Python parallelisieren. Ich nehme eine Spalte und berechne Korrelationswerte mit Ruhe für alle Spalten in einem Prozess und zweite Spalte mit Rest anderer Spalten in einem anderen Prozess. Ich fahre auf diese Weise fort, um die obere Grenze der Korrelationsmatrix zu füllen, indem ich die Ergebniszeilen aller Prozesse stapelt.Python parallelisierte Korrelation langsamer als Einzelprozess Korrelation

Ich nahm Beispieldaten der Form (678461, 210) und versuchte meine parallelisierte Methode und df.corr() und bekam Laufzeit von 214.40s bzw. 42.64s. Also, meine parallelisierte Methode braucht mehr Zeit.

Gibt es eine Möglichkeit, dies zu verbessern?

import multiprocessing as mp 
import pandas as pd 
import numpy as np 
from time import * 

def _correlation(args): 

    i, mat, mask = args 
    ac = mat[i] 

    arr = [] 

    for j in range(len(mat)): 
     if i > j: 
      continue 

     bc = mat[j] 
     valid = mask[i] & mask[j] 
     if valid.sum() < 1: 
      c = NA  
     elif i == j: 
      c = 1. 
     elif not valid.all(): 
      c = np.corrcoef(ac[valid], bc[valid])[0, 1] 
     else: 
      c = np.corrcoef(ac, bc)[0, 1] 

     arr.append((j, c)) 

    return arr 

def correlation_multi(df): 

    numeric_df = df._get_numeric_data() 
    cols = numeric_df.columns 
    mat = numeric_df.values 

    mat = pd.core.common._ensure_float64(mat).T 
    K = len(cols) 
    correl = np.empty((K, K), dtype=float) 
    mask = np.isfinite(mat) 

    pool = mp.Pool(processes=4) 

    ret_list = pool.map(_correlation, [(i, mat, mask) for i in range(len(mat))]) 

    for i, arr in enumerate(ret_list): 
     for l in arr: 
      j = l[0] 
      c = l[1] 

      correl[i, j] = c 
      correl[j, i] = c 

    return pd.DataFrame(correl, index = cols, columns = cols) 

if __name__ == '__main__': 
    noise = pd.DataFrame(np.random.randint(0,100,size=(100000, 50))) 
    noise2 = pd.DataFrame(np.random.randint(100,200,size=(100000, 50))) 
    df = pd.concat([noise, noise2], axis=1) 

    #Single process correlation  
    start = time() 
    s = df.corr() 
    print('Time taken: ',time()-start) 

    #Multi process correlation 
    start = time() 
    s1 = correlation_multi(df) 
    print('Time taken: ',time()-start) 

Antwort

0

Die Ergebnisse aus _correlation haben von den Arbeitsprozessen an den Prozess bewegt werden, um die Kommunikation über Pool Interprozess ausgeführt wird.

Dies bedeutet, dass die Rückgabedaten gebeizt, an den anderen Prozess gesendet, unpickled und zur Ergebnisliste hinzugefügt werden. Dies braucht Zeit und ist von Natur aus ein sequentieller Prozess.

Und map verarbeitet die Rückgaben in der Reihenfolge, in der sie gesendet wurden, IIRC. Wenn eine Iteration relativ lange dauert, bleiben andere Ergebnisse möglicherweise hängen. Sie könnten versuchen, imap_unordered verwenden, die Ergebnisse liefert, sobald sie ankommen.