2016-08-22 2 views
1

Ich habe eine Liste von Dateinamen files, die komma-getrennte Daten enthalten, die gereinigt werden müssen, sowie weiter durch Spalten mit Informationen auf der Grundlage der Dateinamen erweitert. So habe ich eine kleine read_file Funktion implementiert, die sowohl die Erstreinigung als auch die Berechnung zusätzlicher Spalten übernimmt. Unter Verwendung von db.from_sequence(files).map(read_file) mappe ich die Lesefunktion auf alle Dateien und erhalte jeweils eine Liste von Wörterbüchern.Dask Tasche aus mehreren Dateien in Dask Dataframe mit Spalten

Aber statt einer Liste von Wörterbüchern, möchte ich meine Tasche jede einzelne Zeile der Eingabedateien als Eintrag enthalten. Anschließend möchte ich die Schlüssel der Wörterbücher zu Spaltennamen in einem dask Datenrahmen zuordnen.

from dask import bag as db 

def read_file(filename): 
    ret = [] 
    with open(filename, 'r') as fp: 
     ... # reading line of file and storing result in dict 
     ret.append({'a': val_a, 'b': val_b, 'c': val_c}) 

    return ret 

from dask import bag as db 
files = ['a.txt', 'b.txt', 'c.txt'] 
my_bag = db.from_sequence(files).map(read_file) 
# a,b,c are the keys of the dictionaries returned by read_file 
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']) 

Könnte jemand mich wissen lassen, was ich ändern muss, um diesen Code laufen zu lassen? Gibt es unterschiedliche Ansätze, die besser geeignet wären?

Edit: Ich habe drei Testdateien erstellt a_20160101.txt, a_20160102.txt, a_20160103.txt. Alle von ihnen enthalten nur ein paar Zeilen mit jeweils einer einzigen Zeichenfolge.

asdf 
sadfsadf 
sadf 
fsadff 
asdf 
sadfasd 
fa 
sf 
ads 
f 

Vorher hatte ich einen kleinen Fehler in read_file, aber jetzt arbeitet my_bag.take(10) nach der Kartierung des Leser Aufruf fein:

([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],) 

jedoch my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']) und anschließend my_df.head(10) noch erhöht dask.async.AssertionError: 3 columns passed, passed data had 10 columns

+0

Was ist los mit einer Tasche von Wörterbüchern und dann an to_dataframe? – MRocklin

Antwort

0

Sie wahrscheinlich müssen concat

anrufen

Ihre Tasche von Dateinamen wie folgt aussieht:

['a.txt', 
'b.txt', 
'c.txt'] 

Nachdem Sie Ihre Tasche so aussieht Karte nennen:

[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}], 
[{'a': 1, 'b': 2, 'c': 3}], 
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]] 

Jede Datei in eine Liste von dicts gedreht wurde. Jetzt ist deine Tasche wie eine Listen-of-Dicts.

Die .to_dataframe Methode möchte, dass Sie eine Liste von dicts haben. So lassen Sie unsere Tasche verketten, um eine einzelne abgeflachte Sammlung von dicts

my_bag = db.from_sequence(files).map(read_file).concat() 

[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}, 
{'a': 1, 'b': 2, 'c': 3}, 
{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}] 
zu sein
+0

mit 'concat()' es funktioniert perfekt, danke! Wäre es besser, eine andere Herangehensweise zu verwenden, oder ist es in Ordnung? Wenn ich mich an das [dasask-tutorial] (https://github.com/dask/dask-tutorial) richtig erinnere, sollten die Taschen für die Aufnahme/Umwandlung in Ordnung sein, ist das korrekt? – sim

+0

Das scheint mir ein vernünftiger Ansatz zu sein. Sie könnten auch versuchen [dask.delayed] (http://dask.readthedocs.io/en/latest/delayed.html). Siehe Hinweise zu [Arbeiten mit Sammlungen] (http://dask.readthedocs.io/en/latest/delayed-collections.html) – MRocklin