2017-02-17 3 views
1

ich in einer Klasse an einer Aufgabe arbeite, die ich jetzt erkennen, ein wenig außerhalb meiner Reichweite sein kann (das erste sememster ist ich jede Programmierung getan haben)Paralell-Eingang mit Python

Die Bedingung ist, dass ich paralell Programmierung mit MPI.

Ich muss eine CSV-Datei von bis zu einem Terabyte an Tick-Daten (jede Mikrosekunde) eingeben, die lokal unordentlich sein können. Führen Sie einen Prozess mit den Daten durch, um Rauschen zu identifizieren, und geben Sie eine bereinigte Datendatei aus.

ich ein serielles Programm mit Pandas geschrieben habe, dass die Daten bestimmt erhebliche Ausreißer und schreibt sie in einen Datensatz markiert Rauschen nimmt, erstellen Sie dann die endgültige Datensatz von Original minus Rauschen zu tun, basierend auf dem Index (Zeit)

Ich habe keine Ahnung, wo ich anfangen soll, um das Programm zu paralisieren. Ich verstehe, dass, weil meine Berechnungen alle lokal sind, ich von CSV in parallel importieren und den Prozess ausführen sollte, um Rauschen zu identifizieren.

Ich glaube, der beste Weg, dies zu tun (und ich kann völlig falsch sein) ist zu streuen, führen Sie die Berechnung und sammeln Sie mit einem hdf5. Aber ich weiß nicht, wie ich das umsetzen soll.

Ich möchte nicht, dass jemand einen ganzen Code schreibt, aber vielleicht ein konkretes Beispiel für den Import in CSV-Parallels und das Sammeln der Daten oder eine bessere Herangehensweise an das Problem.

+1

vielleicht hilft dies: http://stackoverflow.com/questions/8424771/parallel-processing-of-a-large-csv-file-in-python – dahrens

+1

Wenn Sie sagen, Sie haben ein Programm geschrieben, "um die zu nehmen Daten, "was genau meinst du: eine Zeile der CSV-Datei? mehrere Reihen? alle Zeilen? Dies ist entscheidend für die Entwicklung eines Ansatzes, um Ihre Lösungen zu paralellisieren. – gregory

+0

Ich benutze read_csv in Chunks, um es in einen Pandas Datenrahmen zu setzen. –

Antwort

1

Wenn Sie Ihr Programm auf eine Funktion gegen eine Liste von Zeilen laufen lassen können, dann wäre ja ein einfacher Multiprocessing-Ansatz einfach und effektiv. Zum Beispiel:

from multiprocessing import Pool 

def clean_tickData(filename): 
    <your code> 

pool = Pool() 
    pool.map(clean_tickData, cvs_row) 
    pool.close() 
    pool.join() 

map von Pool läuft parallel. Man kann steuern, wie viele parallele Prozesse, aber der Standard, der mit einem leeren Aufruf von Pool() gesetzt wird, startet so viele Prozesse wie CPU-Kerne. Wenn Sie also Ihre Aufräumarbeiten auf eine Funktion reduzieren, die über die verschiedenen Zeilen in Ihren cvs ausgeführt werden kann, wäre die Verwendung von pool.map eine einfache und schnelle Implementierung.

+1

danke. Das ist genau die Art von Gliederung, nach der ich gesucht habe. Und ja, mein Code ist ziemlich einfach zu definieren als eine Funktion. Ich werde das heute Abend versuchen! –