2017-07-26 5 views
1

Ich bin in einer Funken Job einen Fehler bekommen, die mich ist erstaunlich:Wie kann man ein großes Zwischenergebnis vor dem Reduzieren vermeiden?

Total size of serialized results of 102 tasks (1029.6 MB) is 
bigger than spark.driver.maxResultSize (1024.0 MB) 

Meine Aufgabe ist es wie folgt aus:

def add(a,b): return a+b 
sums = rdd.mapPartitions(func).reduce(add) 

rdd hat ~ 500 Partitionen und func nimmt die Zeilen in dieser Partition und gibt ein großes Array zurück (ein numpy Array von 1.3M doubles oder ~ 10Mb). Ich möchte alle diese Ergebnisse zusammenfassen und ihre Summe zurückgeben.

Spark scheint das Gesamtergebnis von mapPartitions (func) im Speicher (etwa 5 GB) zu halten, anstatt es inkrementell zu verarbeiten, was nur etwa 30 MB erfordern würde.

Anstatt spark.driver.maxResultSize zu erhöhen, gibt es eine Möglichkeit, die schrittweise zu reduzieren?


Update: Eigentlich bin ich irgendwie überrascht, dass mehr als zwei Ergebnisse jemals im Speicher gehalten werden.

Antwort

3

Hier ist nichts besonders überraschend. Bei Verwendung von reduce wendet Spark eine endgültige Reduzierung auf den Treiber an. Wenn func ein einzelnes Objekt zurückgibt, ist dies effektiv äquivalent zu:

reduce(add, rdd.collect()) 

Sie treeReduce verwenden:

import math 

# Keep maximum possible depth 
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions())) 

oder toLocalIterator:

sum(rdd.toLocalIterator()) 

Der frühere rekursiv Zusammenführen von Partitionen auf der Arbeiter auf Kosten eines erhöhten Netzwerkaustausches. Sie können depth Parameter verwenden, um die Leistung abzustimmen.

Der letztere wird nur eine einzelne Partition zu der Zeit sammeln, aber es erfordert möglicherweise eine Neubewertung der rdd und wesentlicher Teil des Jobs wird vom Treiber durchgeführt.

auf die genaue Logik in func verwendet Je können Sie auch durch Aufteilung der Matrix in Blöcke Arbeitsverteilung verbessern, und eine Addition-Block, beispielsweise mit BlockMatrices

Verwandte Themen