2017-02-21 4 views
1

Mein dask Datenrahmen in CSV hat etwa 120 mm Zeilen und 4 Spalten:Fehler, während ein dask Datenrahmen Export

df_final.dtypes 

cust_id  int64 
score   float64 
total_qty  float64 
update_score float64 
dtype: object 

und ich mache diese Operation auf jupyter Notebooks Linux-Maschine angeschlossen:

%time df_final.to_csv('/path/claritin-files-*.csv') 

und wirft diesen Fehler:

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-24-46468ae45023> in <module>() 
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')") 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s) 
    2334   magic_name, _, magic_arg_s = arg_s.partition(' ') 
    2335   magic_name = magic_name.lstrip(prefilter.ESC_MAGIC) 
-> 2336   return self.run_line_magic(magic_name, magic_arg_s) 
    2337 
    2338  #------------------------------------------------------------------------- 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line) 
    2255     kwargs['local_ns'] = sys._getframe(stack_depth).f_locals 
    2256    with self.builtin_trap: 
-> 2257     result = fn(*args,**kwargs) 
    2258    return result 
    2259 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k) 
    191  **# but it's overkill for just that one bit of state.** 
    192  def magic_deco(arg): 
--> 193   call = lambda f, *a, **k: f(*a, **k) 
    194 
    195   if callable(arg): 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 
    1161   if mode=='eval': 
    1162    st = clock2() 
-> 1163    out = eval(code, glob, local_ns) 
    1164    end = clock2() 
    1165   else: 

<timed eval> in <module>() 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs) 
    936   """ See dd.to_csv docstring for more information """ 
    937   from .io import to_csv 
--> 938   return to_csv(self, filename, **kwargs) 
    939 
    940  def to_delayed(self): 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs) 
    411  if compute: 
    412   from dask import compute 
--> 413   compute(*values, get=get) 
    414  else: 
    415   return values 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs) 
    177   dsk = merge(var.dask for var in variables) 
    178  keys = [var._keys() for var in variables] 
--> 179  results = get(dsk, keys, **kwargs) 
    180 
    181  results_iter = iter(results) 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs) 
    74  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    75       cache=cache, get_id=_thread_get_id, 
---> 76       **kwargs) 
    77 
    78  # Cleanup pools associated to dead threads 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs) 
    491      _execute_task(task, data) # Re-execute locally 
    492     else: 
--> 493      raise(remote_exception(res, tb)) 
    494    state['cache'][key] = res 
    495    finish_task(dsk, key, state, results, keyorder.get) 

**ValueError: invalid literal for long() with base 10: 'total_qty'** 

Traceback 
--------- 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task 
    result = _execute_task(task, data) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task 
    return func(*args2) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text 
    coerce_dtypes(df, dtypes) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes 
    df[c] = df[c].astype(dtypes[c]) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype 
    raise_on_error=raise_on_error, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype 
    return self.apply('astype', dtype=dtype, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply 
    applied = getattr(b, f)(**kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype 
    values=values, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype 
    values = _astype_nansafe(values.ravel(), dtype, copy=True) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe 
    return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape) 
    File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409) 
    File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777) 

ich habe ein paar Fragen:

1) Zuerst war dieser Export am Freitag in Ordnung, es spuckte 100 csv-Dateien aus (da es 100 Partitionen hat), die ich später aggregierte. Also, was ist heute falsch - alles aus dem Fehlerprotokoll?

2) Vielleicht ist diese Frage für die Ersteller dieses Pakets, was ist der zeiteffizienteste Weg, um einen CSV-Extrakt aus einem dask Datenrahmen dieser Größe zu bekommen, da es etwa 1,5 bis 2 Stunden dauerte, das letzte Mal, als es funktionierte.

Ich verwende nicht dask verteilt und das ist auf einem einzelnen Kern eines Linux-Clusters.

Antwort

1

Dieser Fehler hat wahrscheinlich wenig mit to_csv zu tun und mehr mit etwas anderes in Ihrer Berechnung zu tun. Der Aufruf an df.to_csv war nur das erste Mal, dass Sie die Berechnung erzwangen, durch alle Daten zu rollen.

Angesichts des Fehlers vermute ich tatsächlich, dass dies in read_csv scheitert. Dask.dataframe liest die ersten paar hundert Kilobytes Ihrer ersten Datei, um die Datentypen zu erraten, aber es scheint falsch geraten zu haben. Sie können versuchen, dtypes explizit im Aufruf von read_csv anzugeben.

In Bezug auf die zweite Frage über das Schreiben in CSV schnell, meine erste Antwort wäre "Parkett oder HDF5 statt verwenden". Sie sind in fast jeder Hinsicht viel schneller und genauer.

+0

Danke !!, ja, vermutete früher, da ich den Datenrahmen aus einem CSV-Format lesen. Nicht sicher, warum es nicht richtig liest. Was Ihren Vorschlag zur 2. Frage anbelangt, ist das Lesen und Schreiben im Parkettformat (ich kenne Parkett). –

+0

Eine häufige Ursache ist, dass eine Integer-Spalte einige fehlende Werte hat, so dass Pandas entscheidet, dass sie float durchgehen muss. Ich verstehe Ihren Kommentar zu Parkett nicht. – MRocklin

+0

Ich meinte, wenn Sie sagten, verwenden Sie Parkett oder HDF5, meinst du die Parkett-Dateien zu lesen, um Datascripts zu konvertieren und dann auf Parkett-Format anstelle von CSV-Format zu schreiben? Können die CSV-Dateien schneller exportiert werden (mein Dataframe ist 130 mm x 4 Spalten), wenn ich dask über einen Cluster von Maschinen verteilt habe? –