Ich habe eine Menge Textverarbeitung auf einem großen Stapel von Dateien einschließlich großer CSVs und viele, viele kleine XML-Dateien. Manchmal mache ich Aggregationszählungen, aber oft mache ich NLP-artige Arbeiten, um tiefer zu untersuchen, was sich in diesen Dateien jenseits dessen befindet, was markiert oder bereits strukturiert ist.Langsame Dask-Leistung bei CSV-Datumsanalyse?
Ich habe die Multiprocessing-Bibliothek viel verwendet, um diese Berechnungen über mehrere CPUs durchzuführen, aber ich bin verliebt in die Ideen hinter Dask und es wird sowohl im Netz als auch von Kollegen wärmstens empfohlen.
fragte ich eine ähnliche Frage zu Dask Leistung hier:
Slow Performance with Python Dask bag?
und MRocklin (https://stackoverflow.com/users/616616/mrocklin) Lassen Sie mich das Laden viele kleine Dateien kennen die Leistung wahrscheinlich im Papierkorb.
Doch wenn ich es auf einzelne große Dateien (200 MB) ausführen, bekomme ich es immer noch nicht sehr gut. Hier ist ein Beispiel:
Ich habe eine CSV-Datei mit 900.000 Zeilen CSV-Tweets und ich möchte es schnell laden und analysieren das Feld "created_at". Hier sind drei Wege, wie ich es gemacht habe und die Benchmarks für jeden von ihnen. Ich habe dies auf einem neuen i7 2016 MacBook Pro mit 16 GB RAM ausgeführt.
import pandas
import dask.dataframe as dd
import multiprocessing
%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))
CPU-Zeiten: user 2min 31s, sys: 807 ms, gesamt: 2min 32s Wandzeit: 2min 32s
%%time
# Multithreaded chunking
def parse_frame_dates(frame):
frame["created_at"] = pandas.to_datetime(frame["created_at"])
return(frame)
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))
CPU-Zeiten: Benutzer 5,65 s, sys: 1,47 s, gesamt: 7,12 s Wandzeit: 1min 10s
%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000).compute()
CPU-Zeiten: user 2min 59s, sys: 26,2 s, gesamt: 3min 25s Wandzeit: 3min 12s
Ich habe diese Art von Ergebnissen auf viele verschiedene Dask-Vergleiche gefunden, aber selbst wenn es richtig funktioniert, könnte es mich in die richtige Richtung weisen.
Kurz gesagt, wie bekomme ich die beste Leistung von Dask für diese Art von Aufgaben? Warum scheint es, dass sowohl Singlethread- als auch Multi-Threading-Techniken, die auf andere Weise ausgeführt werden, hinter den Erwartungen zurückbleiben?
Die Methoden der Zeitsteuerung, die OP sind nicht das Gleiche. Die Übergabe von '' parse_dates = ... '' ist eine ziemlich robuste Methode, aber ich muss auf langsamere Parsing (in Python) zurückgreifen. Sie möchten fast immer einfach in der CSV, DANN, Nachbearbeitung mit .to_datetime lesen, insbesondere müssen Sie möglicherweise ein Format = Argument oder andere Optionen abhängig von den Daten verwenden. , YMMV. In der Tat ist diese Methode besonders benutzerfreundlich (da diese separate, obwohl sequentielle Aufgaben sind). – Jeff
Danke, Matthew! Das hat einen großen Unterschied gemacht. In meinem Beispiel wurde der Multiprozess von 2 Minuten für einen einzelnen Thread auf 30 Sekunden für Dies reduziert. Viel mehr nach dem, was ich erwartet hatte. Ich werde in die Info über die Scheduler-Auswahl und die Verwendung des verteilten Schedulers eintauchen. Ich hatte eindeutig nicht alle meine Hausaufgaben gemacht. Danke noch einmal! –