2017-01-21 4 views
2

Ich verwende HashPartioner, aber ein unerwartetes Ergebnis erhalten. Ich verwende 3 verschiedene String als Schlüssel, und geben Partitionsparameter als 3, so erwarte ich 3 Partitionen.Spark HashPartitioner Unerwartete Partitionierung

val cars = Array("Honda", "Toyota", "Kia") 

val carnamePrice = sc.parallelize(for { 
x <- cars 
y <- Array(100,200,300) 
} yield (x, y), 8) 
val rddEachCar = carnamePrice.partitionBy(new HashPartitioner(3)) 
val mapped = rddEachCar.mapPartitionsWithIndex{ 
       (index, iterator) => { 
        println("Called in Partition -> " + index) 
        val myList = iterator.toList 

        myList.map(x => x + " -> " + index).iterator 
       } 
      } 
mapped.take(10) 

Das Ergebnis ist unten. Es gibt nur 2 Partitionen. Ich habe Hash-Codes für String (69909220 75427 -1783892706) überprüft. Was könnte hier ein Problem sein? Wahrscheinlich habe ich den Partitionierungsalgorithmus falsch verstanden.

Array[String] = Array((Toyota,100) -> 0, (Toyota,200) -> 0, (Toyota,300) -> 0, (Honda,100) -> 1, (Honda,200) -> 1, (Honda,300) -> 1, (Kia,100) -> 1, (Kia,200) -> 1, (Kia,300) -> 1) 

Antwort

2

Hier passiert nichts Seltsames. Utils.nonNegativeMod, die von HashPartitioner verwendet wird, wird wie folgt umgesetzt:

def nonNegativeMod(x: Int, mod: Int): Int = { 
    val rawMod = x % mod 
    rawMod + (if (rawMod < 0) mod else 0) 
} 

Mit 3 Partitionen die Schlüsselverteilung wie unten dargestellt definiert:

for { car <- Seq("Honda", "Toyota", "Kia") } 
    yield (car -> nonNegativeMod(car.hashCode, 3)) 
Seq[(String, Int)] = List((Honda,1), (Toyota,0), (Kia,1)) 

das ist genau das, was Sie in Ihrem Fall erhalten . Mit anderen Worten, das Fehlen einer direkten Hash-Kollision garantiert nicht den Mangel an Kollisionsmodulo einer beliebigen Anzahl.

Verwandte Themen