2017-06-06 4 views
2

Ich konstruiere eine sehr große DAG in dask, um an den verteilten Scheduler zu senden, wo Knoten auf Datenrahmen arbeiten, die selbst ziemlich groß sein können. Ein Muster ist, dass ich ungefähr 50-60 Funktionen habe, die Daten laden und Pandas-Datenrahmen konstruieren, die jeweils mehrere hundert MB groß sind (und logisch Partitionen einer einzelnen Tabelle darstellen). Ich möchte diese in einem einzigen dask Datenrahmen für nachgelagerte Knoten in der Grafik verketten, während die Datenbewegung minimiert wird. Ich verbinde die Aufgaben wie folgt aus:Dask-Diagramm-Ausführung und Speicherauslastung

dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs] 
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs] 
return dask.delayed(concat_all)(dfs) 

wo

def pandas_to_dask(df): 
    return dask.dataframe.from_pandas(df).to_delayed() 

und ich habe verschiedene concat_all implentations versucht, aber dies scheint vernünftig:

def concat_all(dfs): 
    dfs = [dask.dataframe.from_delayed(df) for df in dfs] 
    return dask.dataframe.multi.concat(dfs, axis='index', join='inner') 

Alle Pandas Datenrahmen disjunkt sind auf ihre Index und sortiert/monoton.

Allerdings bin ich getötet Arbeiter immer auf dieser concat_all Funktion zu sterben (Cluster-Manager tötet sie ihre Speicherbudgets für die Überschreitung), obwohl der Speicher Budget auf jeder tatsächlich recht groß ist und ich kann es nicht erwarten, sich zu bewegen Daten herum. Ich bin ziemlich sicher, dass ich immer zu einer vernünftigen Teilmenge von Daten zerschneide, bevor ich compute() in Diagramm nodse nenne, die den dask Datenrahmen verwenden.

Ich spiele mit --memory-limit ohne Erfolg bisher. Komme ich das Problem zumindest richtig an? Gibt es Überlegungen, die ich vermisse?

Antwort

2

Angesichts der Liste der verzögerten Werte, die Pandas zu Datenrahmen

>>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs] 
>>> type(dfs[0].compute()) # just checking that this is true 
pandas.DataFrame 

Geben sie zum dask.dataframe.from_delayed Funktion

>>> ddf = dd.from_delayed(dfs) 

standardmäßig berechnen dies die erste Berechnung ausführen, um Metadaten zu bestimmen (siehe Spalte Namen, Dtypes usw., die für dasask.dataframe wichtig sind). Sie können dies vermeiden, indem Sie einen Beispieldatenrahmen erstellen und an das Schlüsselwort meta= übergeben.

>>> meta = pd.DataFrame({'value': [1.0], 'name': ['foo'], 'id': [0]}) 
>>> ddf = dd.from_delayed(dfs, meta=meta) 

Diese example notebook kann auch hilfreich sein.

Im Allgemeinen müssen Sie die DASK-Funktionen nicht innerhalb anderer DASK-Funktionen aufrufen (wie Sie es taten, indem Sie den Aufruf from_pandas verzögern). Dieks.dataframe-Funktionen sind selbst bereits faul und müssen nicht weiter verzögert werden.

+0

Vielen Dank für Ihre schnelle Antwort. Ich beobachte, dass dd.from_delayed (dfs) sofort 'dfs [0]' auswertet, um Metadaten zu extrahieren. Aus irgendeinem Grund verursacht dies Probleme für mich. Gibt es eine andere Möglichkeit, diese Auswertung zu verschieben, bis der Graph vollständig aufgebaut ist? Ich werde versuchen, eine Repro zusammenzustellen. –

+0

Sie können dem Schlüsselwort 'meta =' einen Beispieldatenrahmen bereitstellen. Ich füge ein Beispiel in die Antwort ein. – MRocklin