Ich habe eine Liste von Dateinamen files
, die komma-getrennte Daten enthalten, die gereinigt werden müssen, sowie weiter durch Spalten mit Informationen auf der Grundlage der Dateinamen erweitert. So habe ich eine kleine read_file
Funktion implementiert, die sowohl die Erstreinigung als auch die Berechnung zusätzlicher Spalten übernimmt. Unter Verwendung von db.from_sequence(files).map(read_file)
mappe ich die Lesefunktion auf alle Dateien und erhalte jeweils eine Liste von Wörterbüchern.Dask Tasche aus mehreren Dateien in Dask Dataframe mit Spalten
Aber statt einer Liste von Wörterbüchern, möchte ich meine Tasche jede einzelne Zeile der Eingabedateien als Eintrag enthalten. Anschließend möchte ich die Schlüssel der Wörterbücher zu Spaltennamen in einem dask Datenrahmen zuordnen.
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
Könnte jemand mich wissen lassen, was ich ändern muss, um diesen Code laufen zu lassen? Gibt es unterschiedliche Ansätze, die besser geeignet wären?
Edit: Ich habe drei Testdateien erstellt a_20160101.txt
, a_20160102.txt
, a_20160103.txt
. Alle von ihnen enthalten nur ein paar Zeilen mit jeweils einer einzigen Zeichenfolge.
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
Vorher hatte ich einen kleinen Fehler in read_file
, aber jetzt arbeitet my_bag.take(10)
nach der Kartierung des Leser Aufruf fein:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
jedoch my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
und anschließend my_df.head(10)
noch erhöht dask.async.AssertionError: 3 columns passed, passed data had 10 columns
Was ist los mit einer Tasche von Wörterbüchern und dann an to_dataframe? – MRocklin