2017-03-12 3 views
1

Ich muss die Ergebnisse vieler Bäume für einige Abfragen ansammeln, die ein großes Ergebnis ausgibt. Da alle Bäume unabhängig voneinander behandelt werden können, ist es peinlich parallel, abgesehen von der Tatsache, dass die Ergebnisse summiert werden müssen und ich die Zwischenergebnisse für alle Bäume im Speicher nicht speichern kann. Unten ist ein einfaches Beispiel für einen Code für das Problem, der alle Zwischenergebnisse im Speicher speichert (natürlich sind die Funktionen neuere die gleichen im realen Problem, da dies doppelte Arbeit wäre).Python-Funktionen parallelisieren, ohne Zwischenergebnisse zu speichern

import numpy as np 
from joblib import Parallel, delayed 


functions=[[abs,np.round] for i in range(500)] # Dummy functions 
functions=[function for sublist in functions for function in sublist] 
X=np.random.normal(size=(5,5)) # Dummy data 


def helper_function(function,X=X): 
    return function(X) 
results = Parallel(n_jobs=-1,)(
     map(delayed(helper_function), [functions[i] for i in range(1000)])) 
results_out = np.zeros(results[0].shape) 
for result in results: 
    results_out+=result 

Eine Lösung könnte die folgende Modifikation sein:

import numpy as np 
from joblib import Parallel, delayed 

functions=[[abs,np.round] for i in range(500)] # Dummy functions 
functions=[function for sublist in functions for function in sublist] 
X=np.random.normal(size=(5,5)) # Dummy data 
results_out = np.zeros(results[0].shape) 

def helper_function(function,X=X,results=results_out): 
    result = function(X) 
    results += result 
Parallel(n_jobs=-1,)(
     map(delayed(helper_function), [functions[i] for i in range(1000)])) 

Aber dies könnte Rennen führen. Also ist es nicht optimal.

Haben Sie irgendwelche Vorschläge für das Vorformen, ohne die Zwischenergebnisse zu speichern und trotzdem parallel zu machen?

Antwort

0

Die Antwort wird in der documentation of joblib gegeben.

with Parallel(n_jobs=2) as parallel: 
    accumulator = 0. 
    n_iter = 0 
    while accumulator < 1000: 
     results = parallel(delayed(sqrt)(accumulator + i ** 2) 
          for i in range(5)) 
     accumulator += sum(results) # synchronization barrier 
     n_iter += 1 

Sie können die Berechnung in Blöcken tun und die Chunk reduzieren, wie Sie sind aus dem Speicher laufen.

+0

Dank gute Antwort. Ich habe deine Antworten erst gesehen, nachdem ich die Lösung mit potenziellen Rassen hinzugefügt habe. –

+0

Hat es dein Problem gelöst? Wenn ja, bitte überlege, die Antwort zu akzeptieren, wenn ich nicht neugierig wäre, warum :-) – Ohjeah

+0

Ich denke, es hat das Problem zu 99% gelöst, da es in der Praxis funktioniert. Aber es lässt immer noch einen Hyperparameter für die Anwendung (die Korbgröße für den Akkumulator) und gibt eine Synchronisationsbarriere. Nur aus Neugier Ich frage mich, ob es möglich ist, sichere Inplace-Operationen zu tun, so dass alle Threads die Ergebnisse aktualisieren können. Ich denke, dass in anderen parallelen Programmiersystemen wie Tensorflow so etwas enthalten ist. –

Verwandte Themen