2017-10-13 2 views
1

Ich habe eine Parkett-Datei im Hive-Format und bissige Kompression. Es passt in den Speicher und ein pandas.info stellt die folgenden Daten zur Verfügung.inkonsistente Verarbeitungszeit in DASK verteilt fastparquet

Anzahl der Zeilen pro Gruppe in Parkett-Datei ist nur 100K

>>> df.info() 
<class 'pandas.core.frame.DataFrame'> 
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM= 
Data columns (total 8 columns): 
payment_method_id   int16 
payment_plan_days   int16 
plan_list_price   int16 
actual_amount_paid  int16 
is_auto_renew    bool 
transaction_date   datetime64[ns] 
membership_expire_date datetime64[ns] 
is_cancel     bool 
dtypes: bool(2), datetime64[ns](2), int16(4) 
memory usage: 698.7+ MB 

Jetzt, mit dask einige einfache Rechnung machen erhalte ich die folgenden Timings

>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime() 
'Fri Oct 13 23:44:50 2017' 
141.98732048354384 
'Fri Oct 13 23:44:59 2017' 

unter Verwendung verteilter Einfädeln mit (lokaler Cluster)

>>> c=Client() 
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime() 
'Fri Oct 13 23:47:04 2017' 
141.98732048354384 
'Fri Oct 13 23:47:15 2017' 
>>> 

Das war in Ordnung, etwa 9 Sekunden.

Jetzt Multiprozessing verwenden, hier kommt die Überraschung ...

>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime() 
'Fri Oct 13 23:50:43 2017' 
141.98732048354384 
'Fri Oct 13 23:57:49 2017' 
>>> 

ich Multiprozessing erwarten würde und verteilt/lokale Cluster mit einem Gewinde mit möglicherweise einige Unterschiede auf der gleichen Größenordnung sein (für gut oder schlecht)

Multiprozessing benötigt jedoch 47-mal mehr Zeit, um einen einfachen Mittelwert über eine In16-Spalte zu erstellen.

Mein env ist nur eine frische conda Installation mit erforderlichen Modulen. Keine Handlese von irgendetwas.

Warum gibt es diese Unterschiede? Ich kann dasc/distributed nicht so handhaben, dass ich ein vorhersehbares Verhalten habe, um in der Lage zu sein, abhängig von der Art meines Problems weise zwischen den verschiedenen Schedulern zu wählen.

Dies ist nur ein Spielzeugbeispiel, aber ich konnte kein Beispiel finden, das meinen Erwartungen entsprach (so wie ich zumindest die Dokumente gelesen habe).

Gibt es irgendetwas, das ich in meinem Hinterkopf behalten sollte? oder verpasse ich gerade den Punkt völlig?

Dank

JC

Antwort

1

Mit dem Gewinde Scheduler, jede Aufgabe Zugriff auf alle Speicher des Prozesses hat - alle Daten in diesem Fall - und ohne Speicherkopieren seine Berechnungen daher tun können.

Mit dem verteilten Scheduler weiß der Scheduler, welcher Thread und welcher Worker die Daten erzeugt, die für eine nachfolgende Task benötigt werden, oder hat diese Daten bereits im Speicher. Die Schlauheit des Schedulers ist speziell darauf ausgerichtet, die Berechnung an den richtigen Arbeiter zu übertragen, um Datenkommunikation und -kopieren zu vermeiden.

Umgekehrt neigt der Multiprocess Scheduler dazu, Aufgabenergebnisse zum und vom Hauptprozess zu senden, was eine Menge Serialisierung und Kopieren beinhalten kann. Einige Aufgaben können miteinander verbunden werden (indem Aufgaben durch Aufrufen vieler Python-Funktionen in einer Kette kombiniert werden), einige jedoch nicht. Jegliches Serialisieren und Kopieren kostet CPU-Aufwand und, wahrscheinlich wichtiger für Sie, Speicherplatz. Wenn Ihre Originaldaten einen beträchtlichen Teil der Gesamtsumme des Systems ausmachen, füllen Sie wahrscheinlich den physischen Speicher auf, was zu einer Verlangsamung des großen Faktors führt.

+0

ja, nach einigem Wrestling und viel Nachlesen des Handbuchs kam ich tatsächlich zum selben Schluss.Viel lernen ;-) –

Verwandte Themen