2017-06-07 4 views
1

Ich habe einen Beispieldatensatz in meinem lokalen und ich versuche, einige grundlegende Operationen auf einem Cluster zu tun.Fehler - Fehler von Worker Keine Datei oder Verzeichnis: 'Dateipfad'

import dask.dataframe as ddf 
    from dask.distributed import Client 
    client = Client('Ip address of the scheduler') 
    import dask.dataframe as ddf 
    csvdata = ddf.read_csv('Path to the CSV file') 

Der Client ist mit einem Scheduler verbunden, der wiederum mit zwei Arbeitern (auf anderen Maschinen) verbunden ist.

Meine Fragen können ziemlich trivial sein.

  1. Sollte diese CSV-Datei auf anderen Worker-Knoten vorhanden sein?

    Ich scheine Datei nicht gefunden Fehler zu bekommen.

  2. Verwendung,

    futures=client.scatter(csvdata) 
    x = ddf.from_delayed([future], meta=df) 
    #Price is a column in the data 
    df.Price.sum().compute(get=client.get) #returns" dd.Scalar<series-..., dtype=float64>" How do I access it? 
    client.submit(sum, x.Price) #returns "distributed.utils - ERROR - 6dc5a9f58c30954f77913aa43c792cc8" 
    

Auch ich tat dies Loading local file from client onto dask distributed cluster und http://distributed.readthedocs.io/en/latest/manage-computation.html

verweise ich denke, ich bin Mischen hier eine Menge Dinge, und mein Verständnis ist durcheinander. Jede Hilfe würde wirklich geschätzt werden.

Antwort

1

Ja, hier_data.dataframe geht davon aus, dass die Dateien, auf die Sie in Ihrem Client-Code verweisen, auch für Ihre Mitarbeiter zugänglich sind. Ist dies nicht der Fall, werden Sie Ihre Daten explizit auf Ihrem lokalen Rechner einlesen und an Ihre Mitarbeiter verteilen.

Es sieht so aus, als ob Sie genau das versuchen, nur dass Sie Datasframes statt Pandas Datenframes streuen. Sie müssen tatsächlich Pandas Daten von der Festplatte laden, bevor Sie es streuen. Wenn Sie Ihre Daten in den Speicher passt, dann sollten Sie in der Lage sein, genau das zu tun, was Sie jetzt tun, aber die dd.read_csv Anruf mit pd.read_csv ersetzen

csvdata = pandas.read_csv('Path to the CSV file') 
[future] = client.scatter([csvdata]) 
x = ddf.from_delayed([future], meta=df).repartition(npartitions=10).persist() 
#Price is a column in the data 
df.Price.sum().compute(get=client.get) # Should return an integer 

Wenn Ihre Daten zu groß ist, dann könnten Sie mit Führen Sie eine lokale Suche durch, um Daten Stück für Stück in Ihrem Cluster zu lesen und zu verteilen.

import dask.dataframe as dd 
ddf = dd.read_csv('filename') 
futures = ddf.map_partitions(lambda part: c.scatter([part])[0]).compute(get=dask.get) # single threaded local scheduler 

ddf = dd.from_delayed(list(futures), meta=ddf.meta) 
Verwandte Themen