2

die Existenz einer RDD von Tupeln ähnlich dem folgenden Unter der Annahme:Spark RDD: Wie Statistiken am effizientesten zu berechnen?

(key1, 1) 
(key3, 9) 
(key2, 3) 
(key1, 4) 
(key1, 5) 
(key3, 2) 
(key2, 7) 
... 

Was ist der effizienteste (und idealerweise verteilt) Art und Weise Statistiken zu den einzelnen Tasten zu berechnen? (Im Moment suche ich nach Standardabweichung/Varianz zu berechnen, insbesondere.) Wie ich es verstehe, meine Optionen betragen:

  1. Verwenden Sie den colStats function in MLLib: Dieser Ansatz den Vorteil, leicht adaptierbare hat Andere Funktionen später zu verwenden, wenn andere statistische Berechnungen für notwendig erachtet werden. Es arbeitet jedoch mit einer RDD von Vector, die die Daten für jede Spalte enthält, so wie ich es verstehe, würde dieser Ansatz erfordern, dass der vollständige Satz von Werten für jeden Schlüssel auf einem einzelnen Knoten gesammelt wird, was für große nicht ideal erscheinen würde Datensätze. Bedeutet ein Spark Vector immer, dass die Daten in der Vector lokal resident sind, auf einem einzigen Knoten?
  2. Führen Sie eine , dann stats: Wahrscheinlich shuffle-schwer, as a result of the groupByKey operation.
  3. aggregateByKey Führen Sie einen neuen StatCounter Initialisierung und mit StatCounter::merge wie die Sequenz und Kombinierer Funktionen: Dies ist der Ansatz recommended by this StackOverflow answer und vermeidet die groupByKey von Option 2. Allerdings habe ich eine gute Dokumentation nicht in der Lage gewesen zu finden StatCounter in PySpark.

Ich mag Option 1, da sie den Code erweiterbar, in das macht es leicht kompliziertere Berechnungen mit anderen MLLib Funktionen mit ähnlichen Verträge aufnehmen konnte, aber wenn die Vector Eingaben erfordern von Natur aus, dass die Datensätze lokal gesammelt werden, dann begrenzt es die Datengrößen, auf denen der Code effektiv arbeiten kann. Zwischen den beiden anderen, Option 3 sieht effizienter aus, weil es die groupByKey vermeidet, aber ich hatte gehofft zu bestätigen, dass das der Fall ist.

Gibt es noch andere Optionen, die ich nicht berücksichtigt habe? (Ich verwende derzeit Python + PySpark, aber ich bin offen für Lösungen in Java/Scala auch, wenn es einen Unterschied in der Sprache gibt.)

+0

Mögliches Duplikat von [min/max mit pyspark im Einzeldurchlauf über Daten finden] (http://stackoverflow.com/questions/36559809/finding-min-max-with-pypspark-in-single-pass-over -Daten) –

Antwort

2

Sie können versuchen reduceByKey. Es ist ziemlich einfach, wenn wir nur die min() berechnen möchten:

rdd.reduceByKey(lambda x,y: min(x,y)).collect() 
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)] 

Um die mean zu berechnen, müssen Sie zuerst (value, 1) Tupel erstellen müssen, die wir sowohl die sum und count im reduceByKey Betrieb zu berechnen verwenden.Schließlich teilen wir sie von einander am mean BRIEFE

meanRDD = (rdd 
      .mapValues(lambda x: (x, 1)) 
      .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) 
      .mapValues(lambda x: x[0]/x[1])) 

meanRDD.collect() 
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)] 

Für die variance, können Sie die Formel verwenden (sumOfSquares/count) - (sum/count)^2, , die wir in der folgenden Art und Weise übersetzen:

varRDD = (rdd 
      .mapValues(lambda x: (1, x, x*x)) 
      .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2])) 
      .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2))) 

varRDD.collect() 
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)] 

ich verwendet, um Werte vom Typ double anstelle von int im Dummy Daten, um die Berechnung der Durchschnitt und Varianz genau zu veranschaulichen:

rdd = sc.parallelize([("key1", 1.0), 
         ("key3", 9.0), 
         ("key2", 3.0), 
         ("key1", 4.0), 
         ("key1", 5.0), 
         ("key3", 2.0), 
         ("key2", 7.0)])