Wenn Sie nicht sicher sind, was es wird für den Code hier am besten am folgende Ergebnis kommen folgen die Typen. Das Weglassen implizite ClassTag
der Kürze beginnen wir mit so etwas wie dieses
abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
Wenn Sie alle zusätzlichen Parameter ignorieren werden Sie sehen, dass aggregate
eine Funktion ist, die RDD[T]
-U
abbildet. Dies bedeutet, dass der Typ der Werte in der Eingabe nicht mit dem Typ des Ausgabewerts übereinstimmen muss. So ist es deutlich anders als zum Beispiel reduce
:
def reduce(func: (T, T) ⇒ T): T
oder fold
:
def fold(zeroValue: T)(op: (T, T) => T): T
Das gleiche wie fold
, aggregate
ein zeroValue
erfordert. Wie wähle ich es aus? Es sollte ein identitätsneutrales Element in Bezug auf combOp
sein.
Sie haben auch zwei Funktionen zur Verfügung:
seqOp
, die von (U, T)
Karten zu U
combOp
, die von (U, U)
Karten zu U
basiert einfach auf diese Signaturen Sie bereits sehen sollten dass nur seqOp
auf die Rohdaten zugreifen kann. Es nimmt einen Wert vom Typ U
eines anderen Typs vom Typ T
und gibt einen Wert vom Typ U
zurück. In Ihrem Fall ist es eine Funktion mit einer folgenden Signatur
((Int, Int), Int) => (Int, Int)
An dieser Stelle werden Sie wahrscheinlich vermuten, es für eine Art von faltenartigen Betrieb verwendet wird. Die zweite Funktion akzeptiert zwei Argumente vom Typ U
und gibt einen Wert vom Typ U
zurück. Wie bereits erwähnt, sollte klar sein, dass die Originaldaten nicht berührt werden und nur mit den bereits von seqOp
verarbeiteten Werten arbeiten kann. In Ihrem Fall hat diese Funktion eine Signatur wie folgt:
((Int, Int), (Int, Int)) => (Int, Int)
Also, wie können wir all das zusammen bekommen?
Zuerst jede Partition aggregiert unter Verwendung von Standard Iterator.aggregate
mit zeroValue
, seqOp
und combOp
geleitet, wie z
, seqop
und combop
respectivelly.Da InterruptibleIterator
intern verwendet nicht aggregate
außer Kraft setzen sollte es als ein einfaches foldLeft(zeroValue)(seqOp)
Nächste Teilergebnisse aus jeder Partition gesammelt ausgeführt werden aggregiert werden mit combOp
vermuten lässt, dass die Eingabe RDD hat drei Partitionen mit folgenden Verteilung der Werte:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
können Sie, dass die Ausführung erwarten, absolute Ordnung zu ignorieren, wird so etwas wie dies gleichwertig sein:
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
foldLeft
für eine einzelne Partition wie folgt aussehen:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
und über alle Partitionen
Seq((3,2), (6,2), (0,0))
, die Sie Ergebnis beobachtet geben kombiniert:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
Im Allgemeinen ist dies ein gemeinsames Muster, das Sie überall Funken finden, wo Sie neutral Wert übergeben, verwendet eine Funktionswerte pro Partition und eine Funktion zur Verarbeitung wird verwendet, um partielle Aggregate aus verschiedenen Partitionen zusammenzuführen. Einige andere Beispiele sind:
aggregateByKey
- Benutzerdefinierte Aggregatfunktionen
Aggregators
auf Spark-Datasets
.