2014-07-17 7 views
28

Angenommen, ich habe ein Verteilungssystem auf 3 Knoten und meine Daten werden unter diesen Knoten verteilt. zum Beispiel habe ich eine test.csv-Datei, die auf alle drei Knoten vorhanden ist, und es enthält zwei Spalten:Wie aggregiert Spark Funktion aggregateByKey?

**row | id, c.** 
--------------- 
row1 | k1 , c1 
row2 | k1 , c2 
row3 | k1 , c3 
row4 | k2 , c4 
row5 | k2 , c5 
row6 | k2 , c6 
row7 | k3 , c7 
row8 | k3 , c8 
row9 | k3 , c9 
row10 | k4 , c10 
row11 | k4 , c11 
row12 | k4 , c12 

Dann benutze ich SparkContext.textFile die Datei aus als rdd und so zu lesen. Soweit ich weiß, liest jeder Funke-Arbeiter-Knoten den a-Teil aus der Datei. So jetzt lassen Sie uns jeder Knoten sagen speichert:

  • Knoten 1: Reihe 1 ~ 4
  • Knoten 2: Zeile 5 ~ 8
  • Knoten 3: Zeile 9 ~ 12

Meine Frage ist, dass wir sagen, dass ich eine Berechnung für diese Daten machen möchte, und es gibt einen Schritt, den ich brauche, um den Schlüssel zusammen zu gruppieren, so dass das Schlüsselwertpaar [k1 [{k1 c1} {k1 c2} {k1 c3}]].. und so weiter wäre.

Es gibt eine Funktion namens groupByKey(), die sehr teuer zu verwenden ist, und es wird empfohlen, aggregateByKey() zu verwenden. Also frage ich mich, wie funktioniert groupByKey() und aggregateByKey() unter der Haube funktioniert? Kann jemand das oben angegebene Beispiel verwenden, um es zu erklären? Nach dem Mischen, wo befinden sich die Zeilen auf jedem Knoten?

Antwort

40

aggregateByKey() ist fast identisch mit reduceByKey() (beide Aufruf combineByKey() hinter den Kulissen), außer Sie einen Startwert für aggregateByKey() geben. Die meisten Leute sind vertraut mit reduceByKey(), also werde ich das in der Erklärung verwenden.

Der Grund reduceByKey() ist so viel besser ist, weil es eine MapReduce Feature namens Combiner verwendet. Jede Funktion wie + oder * kann auf diese Weise verwendet werden, da die Reihenfolge der Elemente, auf die sie sich bezieht, keine Rolle spielt. Dadurch kann Spark Werte mit demselben Schlüssel "reduzieren", auch wenn sie noch nicht alle in derselben Partition sind.

Auf der anderen Seite gibt Ihnen groupByKey() mehr Vielseitigkeit, da Sie eine Funktion schreiben, die ein Iterable nimmt, was bedeutet, dass Sie sogar alle Elemente in ein Array ziehen können. Es ist jedoch ineffizient, weil für die Arbeit der gesamte Satz von (K,V,) Paare in einer Partition sein müssen.

Der Schritt, der die Daten um auf einer Verringerung der Aktivität Betrieb bewegt sich im Allgemeinen der mische genannt wird, am einfachsten Ebene die Daten an jedem Knoten partitioniert ist (häufig mit einem Hash Partitionierer) und dann auf jedem Knoten sortiert .

+2

ok, zu meinem Beispiel so können zurückgehen, wenn node1 ~ row3 hat Knoten2 row4 ~ row6 Row1 hat, und node3 zu row12 hat row7. und wenn ich groupByKey mache, werden sich Daten umherbewegen oder nichts bewegt, da sich rdd mit demselben Schlüssel bereits auf dem gleichen Knoten befindet? danke – EdwinGuo

+1

@EdwinGuo nein die Daten könnten immer noch herum bewegen, nehmen wir an, Sie verwenden einen Hash-Partitionierer, wenn alle k1 ist auf Knoten 1, aber k1's Hash-Partitionierer Ergebnis ist 3, es wird immer noch auf den dritten Knoten gehen – aaronman

+0

Aber wenn ich egal, um die Reihenfolge, ich möchte nur ein Array mit allen Werten zurückgeben, wie groupByKey tut. Ist es mit anderer Syntax als groupbykey möglich? –

40

aggregateByKey() ist ganz anders als reduceByKey. Was passiert, ist, dass reduceByKey eine Art von einem speziellen Fall von aggregateByKey ist.

aggregateByKey() kombiniert die Werte für einen bestimmten Schlüssel, und das Ergebnis einer solchen Kombination kann ein beliebiges von Ihnen angegebenes Objekt sein. Sie müssen angeben, wie die Werte innerhalb einer Partition (die im selben Knoten ausgeführt wird) kombiniert und wie Sie das Ergebnis von verschiedenen Partitionen (die sich möglicherweise in verschiedenen Knoten befinden) kombiniert ("hinzugefügt"). reduceByKey ist ein spezieller Fall, in dem Sinne, dass das Ergebnis der Kombination (z. B. eine Summe) vom selben Typ ist wie die Werte und dass die Operation bei Kombination aus verschiedenen Partitionen auch die gleiche ist wie die Operation beim Kombinieren von Werten innerhalb von Partition.

Ein Beispiel: Stellen Sie sich vor Sie haben eine Liste von Paaren. Sie parallelisieren es:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5))) 

Jetzt möchten Sie sie "kombinieren" durch Schlüssel, der eine Summe produziert. In diesem Fall reduceByKey und aggregateByKey gleich ist:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything 
resReduce.collect 
res3: Array[(String, Int)] = Array((b,7), (a,9)) 

//0 is initial value, _+_ inside partition, _+_ between partitions 
val resAgg = pairs.aggregateByKey(0)(_+_,_+_) 
resAgg.collect 
res4: Array[(String, Int)] = Array((b,7), (a,9)) 

nun vorstellen, dass Sie die Aggregation wollen ein Set von Werten zu sein, ist, dass eine andere Art, dass die Werte, die ganzen Zahlen (die Summe der ganzen Zahlen sind auch ganze Zahlen):

import scala.collection.mutable.HashSet 
//the initial value is a void Set. Adding an element to a set is the first 
//_+_ Join two sets is the _++_ 
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_) 
sets.collect 
res5: Array[(String, scala.collection.mutable.HashSet[Int])] =Array((b,Set(7)), (a,Set(1, 5, 3))) 
+0

Sehr gründliche Antwort auf, wie die beiden funktionieren, schätze es! – SparkleGoat

+0

kannst du bitte auch einen Java Code posten, es ist schwer scala zu verstehen – rohanagarwal

Verwandte Themen