2016-05-04 14 views
9

Ich bin ein Apache Spark Lerner und habe eine Aktion aggregate die ich habe keine Ahnung, wie es funktioniert. Kann jemand im Detail Schritt für Schritt buchstabieren und erklären, wie haben wirRDD Aggregat in Funke

RDD input = {1,2,3,3} 

RDD Aggregate function : 

rdd.aggregate((0, 0)) 
((x, y) => 
(x._1 + y, x._2 + 1), 
(x, y) => 
(x._1 + y._1, x._2 + y._2)) 

output : {9,4} 

Dank

Antwort

18

Wenn Sie nicht sicher sind, was es wird für den Code hier am besten am folgende Ergebnis kommen folgen die Typen. Das Weglassen implizite ClassTag der Kürze beginnen wir mit so etwas wie dieses

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

Wenn Sie alle zusätzlichen Parameter ignorieren werden Sie sehen, dass aggregate eine Funktion ist, die RDD[T]-U abbildet. Dies bedeutet, dass der Typ der Werte in der Eingabe nicht mit dem Typ des Ausgabewerts übereinstimmen muss. So ist es deutlich anders als zum Beispiel reduce:

def reduce(func: (T, T) ⇒ T): T 

oder fold:

def fold(zeroValue: T)(op: (T, T) => T): T 

Das gleiche wie fold, aggregate ein zeroValue erfordert. Wie wähle ich es aus? Es sollte ein identitätsneutrales Element in Bezug auf combOp sein.

Sie haben auch zwei Funktionen zur Verfügung:

  • seqOp, die von (U, T) Karten zu U
  • combOp, die von (U, U) Karten zu U

basiert einfach auf diese Signaturen Sie bereits sehen sollten dass nur seqOp auf die Rohdaten zugreifen kann. Es nimmt einen Wert vom Typ U eines anderen Typs vom Typ T und gibt einen Wert vom Typ U zurück. In Ihrem Fall ist es eine Funktion mit einer folgenden Signatur

((Int, Int), Int) => (Int, Int) 

An dieser Stelle werden Sie wahrscheinlich vermuten, es für eine Art von faltenartigen Betrieb verwendet wird. Die zweite Funktion akzeptiert zwei Argumente vom Typ U und gibt einen Wert vom Typ U zurück. Wie bereits erwähnt, sollte klar sein, dass die Originaldaten nicht berührt werden und nur mit den bereits von seqOp verarbeiteten Werten arbeiten kann. In Ihrem Fall hat diese Funktion eine Signatur wie folgt:

((Int, Int), (Int, Int)) => (Int, Int) 

Also, wie können wir all das zusammen bekommen?

  1. Zuerst jede Partition aggregiert unter Verwendung von Standard Iterator.aggregate mit zeroValue, seqOp und combOp geleitet, wie z, seqop und combop respectivelly.Da InterruptibleIterator intern verwendet nicht aggregate außer Kraft setzen sollte es als ein einfaches foldLeft(zeroValue)(seqOp)

  2. Nächste Teilergebnisse aus jeder Partition gesammelt ausgeführt werden aggregiert werden mit combOp

vermuten lässt, dass die Eingabe RDD hat drei Partitionen mit folgenden Verteilung der Werte:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

können Sie, dass die Ausführung erwarten, absolute Ordnung zu ignorieren, wird so etwas wie dies gleichwertig sein:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1) 
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) 

Seq(Iterator(1, 2), Iterator(3, 3), Iterator()) 
    .map(_.foldLeft((0, 0))(seqOp)) 
    .reduce(combOp) 

foldLeft für eine einzelne Partition wie folgt aussehen:

Iterator(1, 2).foldLeft((0, 0))(seqOp) 
Iterator(2).foldLeft((1, 1))(seqOp) 
(3, 2) 

und über alle Partitionen

Seq((3,2), (6,2), (0,0)) 

, die Sie Ergebnis beobachtet geben kombiniert:

(3 + 6 + 0, 2 + 2 + 0) 
(9, 4) 

Im Allgemeinen ist dies ein gemeinsames Muster, das Sie überall Funken finden, wo Sie neutral Wert übergeben, verwendet eine Funktionswerte pro Partition und eine Funktion zur Verarbeitung wird verwendet, um partielle Aggregate aus verschiedenen Partitionen zusammenzuführen. Einige andere Beispiele sind:

  • aggregateByKey
  • Benutzerdefinierte Aggregatfunktionen
  • Aggregators auf Spark-Datasets.
1

Hier ist mein Verständnis für Ihre Referenz:

Stellen Sie sich zwei Knoten haben, nehmen Sie eine der Eingabe der ersten beiden Listenelemente {1,2}, und ein anderer nimmt {3, 3}. (Die Trennwand ist hier nur für eine bequeme)

an dem ersten Knoten: "(x, y) => (x._1 + y, x._2 + 1)", das erste x (0 , 0) wie angegeben, und y ist dein erstes Element 1, und du wirst ausgeben (0 + 1, 0 + 1), dann kommt dein zweites Element y = 2 und Ausgabe (1 + 2, 1 + 1), Das ist (3, 2)

Am zweiten Knoten passiert die gleiche Prozedur parallel, und Sie haben (6, 2).

"(x, y) => (x._1 + y._1, x._2 + y._2)", fordert Sie auf, zwei Knoten zusammenzuführen, und Sie erhalten (9,4)


eine Sache beachtenswert ist (0,0) zugegeben, tatsächlich zu dem Ergebnis Länge (RDD) + 1 mal.

"scala> rdd.aggregate ((1,1)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x. _1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9) "