2016-06-29 5 views
0

Hier ist der Code:Funken RDD Aggregat Aktion verhält sich seltsam

val email1 = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/ling-spam/ham/3-378msg3.txt") 
    val email2 = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/ling-spam/ham/3-378msg4.txt") 
    val email = email1 ++ email2 
    println(s"Count check: ${email.count() == email1.count() + email2.count()}") 
    val partitionLengths = email.aggregate(Vector.empty[Int])((vec, s) => s.length +: vec, (i1, i2) => i1 ++ i2) 
    println(partitionLengths) 
    println(partitionLengths.sum == email.map(_.length).sum) 
    val partitionLengthsMax = email.aggregate(0)((i: Int, s: String) => { 
    println(s"Partition length: ${s.length}") 
    i + s.length 
    }, (i1, i2) => i1.max(i2)) 
    println(partitionLengthsMax) 

dies in einem ers Rennen gibt diese:

Count check: true 
Vector(244, 0, 31, 96, 0, 23) 
true 
Partition length: 23 
Partition length: 0 
Partition length: 96 
Partition length: 31 
Partition length: 0 
Partition length: 244 
275 

Zunächst einmal, warum gibt es 6 Partitionen, wenn das Setup local[4] ist ? Und warum ist die maximale Länge nicht 244? Anscheinend 275 = 244 + 31.

Der komplette Skript kann hier gefunden werden: https://github.com/kindlychung/learnSpark/blob/master/src/main/scala/RDDDemo.scala

Antwort

0

warum gibt es 6 Partitionen, wenn das Setup lokal ist [4]

Beachten Sie, dass, wenn rdd1 hat x Partitionen und hat y Partitionen, rdd1 ++ rdd2 wird x+y Partitionen haben.

Und warum ist die maximale Länge nicht 244?

Es scheint, dass Sie, dass aggregate ‚s letzte Argument gehen davon aus, combOp: (U, U) ⇒ U, nur zu kombinieren Ergebnisse separaten Partitionen verwendet werden. Nun, das ist nicht der Fall: combOp wird auch verwendet, um Ergebnisse innerhalb einer Partition effizient zu aggregieren.

Eigentlich RDD.aggregate ‚s implementation ist so ziemlich wie folgt:

  • für jede Partition iterator.aggregate(zeroValue)(seqOp, compOb) aufrufen, die Partition
  • Verwendung zu aggregieren compOb, um alle Partitionen zu verschmelzen

In der erster Schritt ist die aggregate Methode GenTraversableOnce.aggregate, für die die ScalaDocs explizit angeben:

die Durchführung dieser Operation auf einer beliebigen Anzahl von Sammel Partitionen arbeiten können,

eine beliebige Anzahl von Malen aufgerufen, so werden combop kann

Bottom line - compOp wird mehr als einmal pro RDD- genannt Partition, daher die Ergebnisse, die Sie sehen.

0

ich den Code Stück ein wenig ändern, um das Zwischenergebnis

val partitionLengthsMax = email.aggregate(0)(
    (i: Int, s: String) => { 
     println(s"$i Partition length: ${s.length} ${s.take(15)}") 
     i + s.length 
    }, 
    (i1, i2) => { 
     println(s"$i1, $i2") 
     i1.max(i2) 
    }) 

gemäß Funken doc zu sehen. Das Codestück sollte bedeuten, die maximale Größe der Summe der Unterpartition zu erhalten.

def aggregate[U](zeroValue: U)(
    seqOp: (U, T) ⇒ U, 
    combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U 

Aggregate die Elemente jeder Partition, und dann werden die Ergebnisse für alle Partitionen, da unter Verwendung kombinieren Funktionen und einen neutralen „Nullwert“.Diese Funktion kann einen anderen Ergebnistyp, U, als den Typ dieser RDD, T, zurückgeben. Daher benötigen wir eine Operation zum Zusammenführen eines T in ein U und eine Operation zum Zusammenführen zweier U, wie in scala.TraversableOnce. Beide Funktionen dürfen ihr erstes Argument ändern und zurückgeben, anstatt ein neues U zu erstellen, um Speicherzuweisungen zu vermeiden.

Verwandte Themen