2016-04-04 2 views
0

Ich bin neu mit Spark und ich muss mit großen Zeitreihen umgehen. Für einen Benchmark muss ich mehrere Implementierungen eines Rollmeans vergleichen. Im Iterationsmodus mit numpy ist es sehr schnell (0,055970s für 1.000.000 Punkte und Fenster = 3). Ich schrieb eine New-comer-in-pyspark-Version eines Rollmeans, und die Ergebnisse sind schrecklich (mehrere Sekunden für den gleichen Vektor). Zum Beispiel, ich habeWie würdest du ein Rollmittel in Funken schreiben?

ts_list = ["key1", "key2",...,"keyN"] 
seq = sc.parallelize(ts_list) 
d = {"key1": [1, 2, 3, ...], "key2": [1, 2, 3, ...]} 

Meine Verarbeitung:

s = seq.map(lambda s: (s, d[s]))\ 
.flatMap(lambda s: [(s[0], sum(elem)/k) for elem in rolling_window(np.array(s[1]), k)])\ 
.groupByKey().mapValues(lambda x: list(x))\ 
.collect() 

Ich denke, dass ein Arbeitnehmer für k Punkte mit wertlos. Ich vermute, dass die Kommunikation zwischen Meister und Arbeitern zeitaufwendig ist. Auch ich frage mich, ob ich Daten in den Treiber mit Parallelisierung oder innerhalb von Arbeitern laden sollte?

Was ist, wenn meine Vektoren Billionen von Punkten haben?

+0

Nun, Sie verwenden 'groupBy' und das Verfahren verbraucht viel von Zeit, weil es das Bewegen der Daten zwischen allen Knoten erfordert. –

Antwort

1

Eine subjektive Liste der Probleme:

  • Verwendung flatMap von groupByKey von mapValues gefolgt gefolgt Sinn überhaupt nicht macht. Es ist teurer als erforderlich und im Allgemeinen (nicht hier) gibt keine Garantien für die Bestellung. Statt nur mapValues verwenden erforderliche Logik anwenden:

    def rolling_mean(xs): 
        ... 
    
    rdd.mapValues(rolling_mean) 
    
  • Erstellen NumPy Arrays teuer ist. Wenn diese klein sind, kann der Overhead ziemlich beträchtlich werden. Da Sie haben eine Implementierung der rolling_window nicht bieten es möglich ist, sinnvolle Tests durchführen, aber in der Regel, wenn Sie nicht verwenden NumPy Arrays von Anfang an sliding_window über normalem list oder Einbau-array sollte schneller

  • Daten werden geladen sein :

    ob ich Daten im Treiber mit Parallelisierung oder innerhalb der Arbeiter laden sollte?

    Wenn es möglich ist, sollten Sie immer Daten über die Arbeiter laden, die nicht vom Fahrer übergeben werden. Die letztgenannte Option ist vor allem beim Testen von Formularen und beim Prototyping nützlich und führt zu einem schwerwiegenden IO-Engpass in Ihrem Programm. Wenn darüber hinaus Daten in den Speicher einer einzelnen Maschine passen, ist es eher unwahrscheinlich, dass triviale Berechnungen verteilt werden.

    Das sagte, wenn Sie sich entscheiden, parallelize tun es klug.

    sc.parallelize(d) 
    

    Es erfordert keine vollständige Kopie d zu jedem Vollstrecker übergeben.

  • last but not least haben realistische Erwartungen. Wenn Aufgabe ist relativ billig wie hier, dann werden die Gesamtkosten getrieben werden, sondern auch andere Faktoren wie Netzwerk-IO, lokale Socket-Kommunikation, Serialisierung, Deserialisierung und allgemeine Buchhaltung

+0

Vielen Dank für diese konstruktive Antwort. Ich kenne meine Unwissenheit in Spark, ich werde daran arbeiten!Am Ende habe ich folgendes gemacht: 'result = seq.map (Lambda s: (s, load_vector (s))). MapValues ​​(Lambda x: rollmean_calc (x, 3)). Collect()' – GwydionFR

Verwandte Themen