2017-01-15 5 views
2

Ich habe eine Menge Textverarbeitung auf einem großen Stapel von Dateien einschließlich großer CSVs und viele, viele kleine XML-Dateien. Manchmal mache ich Aggregationszählungen, aber oft mache ich NLP-artige Arbeiten, um tiefer zu untersuchen, was sich in diesen Dateien jenseits dessen befindet, was markiert oder bereits strukturiert ist.Langsame Dask-Leistung bei CSV-Datumsanalyse?

Ich habe die Multiprocessing-Bibliothek viel verwendet, um diese Berechnungen über mehrere CPUs durchzuführen, aber ich bin verliebt in die Ideen hinter Dask und es wird sowohl im Netz als auch von Kollegen wärmstens empfohlen.

fragte ich eine ähnliche Frage zu Dask Leistung hier:

Slow Performance with Python Dask bag?

und MRocklin (https://stackoverflow.com/users/616616/mrocklin) Lassen Sie mich das Laden viele kleine Dateien kennen die Leistung wahrscheinlich im Papierkorb.

Doch wenn ich es auf einzelne große Dateien (200 MB) ausführen, bekomme ich es immer noch nicht sehr gut. Hier ist ein Beispiel:

Ich habe eine CSV-Datei mit 900.000 Zeilen CSV-Tweets und ich möchte es schnell laden und analysieren das Feld "created_at". Hier sind drei Wege, wie ich es gemacht habe und die Benchmarks für jeden von ihnen. Ich habe dies auf einem neuen i7 2016 MacBook Pro mit 16 GB RAM ausgeführt.

import pandas 
import dask.dataframe as dd 
import multiprocessing 

%%time 
# Single Threaded, no chunking 
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"]) 
print(len(d)) 

CPU-Zeiten: user 2min 31s, sys: 807 ms, gesamt: 2min 32s Wandzeit: 2min 32s

%%time 
# Multithreaded chunking 
def parse_frame_dates(frame): 
    frame["created_at"] = pandas.to_datetime(frame["created_at"]) 
    return(frame) 

d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000) 
frames = multiprocessing.Pool().imap_unordered(get_count, d) 
td = pandas.concat(frames) 
print(len(td)) 

CPU-Zeiten: Benutzer 5,65 s, sys: 1,47 s, gesamt: 7,12 s Wandzeit: 1min 10s

%%time 
# Dask Load 
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000).compute() 

CPU-Zeiten: user 2min 59s, sys: 26,2 s, gesamt: 3min 25s Wandzeit: 3min 12s

Ich habe diese Art von Ergebnissen auf viele verschiedene Dask-Vergleiche gefunden, aber selbst wenn es richtig funktioniert, könnte es mich in die richtige Richtung weisen.

Kurz gesagt, wie bekomme ich die beste Leistung von Dask für diese Art von Aufgaben? Warum scheint es, dass sowohl Singlethread- als auch Multi-Threading-Techniken, die auf andere Weise ausgeführt werden, hinter den Erwartungen zurückbleiben?

Antwort

2

Ich vermute, dass der Pandas read_csv datetime-Parsing-Code pure-python ist und daher nicht viel von der Verwendung von Threads profitieren wird, was standardmäßig von dask.dataframe verwendet wird.

Sie sehen möglicherweise eine bessere Leistung bei der Verwendung von Prozessen.

Ich vermute, dass die folgenden schneller funktionieren würde:

import dask.multiprocessing 
dask.set_options(get=dask.multiprocessing.get) # set processes as default 

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000) 
len(d) 

Das Problem mit Verfahren ist, dass die Kommunikation zwischen Prozessen kann teuer werden. Ich berechne explizit len(d) oben anstatt d.compute(), um zu vermeiden, dass alle Pandas-Datenrahmen in den Arbeitsprozessen in Essig einlegen und sie zum Hauptaufrufprozess verschieben müssen. In der Praxis ist dies jedoch ziemlich üblich, da die Leute selten den vollen Datenrahmen, sondern etwas Berechnung auf dem Datenrahmen wollen.

Der relevante docpage hier ist http://dask.readthedocs.io/en/latest/scheduler-choice.html

Sie können auch die distributed scheduler auf einer einzigen Maschine verwenden möchten, anstatt die Multiprocessing-Scheduler zu verwenden. Dies wird auch in den oben genannten Dokumenten beschrieben.

$ pip install dask distributed 

from dask.distributed import Client 
c = Client() # create processes and set as default 

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000) 
len(d) 
+1

Die Methoden der Zeitsteuerung, die OP sind nicht das Gleiche. Die Übergabe von '' parse_dates = ... '' ist eine ziemlich robuste Methode, aber ich muss auf langsamere Parsing (in Python) zurückgreifen. Sie möchten fast immer einfach in der CSV, DANN, Nachbearbeitung mit .to_datetime lesen, insbesondere müssen Sie möglicherweise ein Format = Argument oder andere Optionen abhängig von den Daten verwenden. , YMMV. In der Tat ist diese Methode besonders benutzerfreundlich (da diese separate, obwohl sequentielle Aufgaben sind). – Jeff

+1

Danke, Matthew! Das hat einen großen Unterschied gemacht. In meinem Beispiel wurde der Multiprozess von 2 Minuten für einen einzelnen Thread auf 30 Sekunden für Dies reduziert. Viel mehr nach dem, was ich erwartet hatte. Ich werde in die Info über die Scheduler-Auswahl und die Verwendung des verteilten Schedulers eintauchen. Ich hatte eindeutig nicht alle meine Hausaufgaben gemacht. Danke noch einmal! –

Verwandte Themen