2017-03-24 1 views
0

zuallererst, sorry, wenn dies eine Frage zum Ausblenden ist, bin ich ein bisschen neu mit Spark.Funken. Halten Sie den Partitionierer nach dem Ändern des Schlüssels

Ich versuche, einige Gruppenoperationen in Spark durchzuführen, und ich versuche, zusätzliche Shuffle zu vermeiden, wenn ich den Schlüssel meiner RDD modifiziere.

Original-RDDs sind json Strings

Vereinfachen der Logik mein Code wie folgt aussieht:

case class Key1 (a: String, b: String) 

val grouped1: RDD[(Key1, String)] = rdd1.keyBy(generateKey1(_)) 
val grouped2: RDD[(Key1, String)] = rdd2.keyBy(generateKey2(_)) 

val joined: RDD[(Key1, (String, String)) = groped1.join(grouped2) 

Jetzt möchte ich im Schlüssel ein neues Feld schließen und einige Operationen reduzieren zu tun. So habe ich so etwas wie:

case class key2 (a: String, b: String, c: String) 

val withNewKey: RDD[Key2, (String, String)] = joined.map{ case (key, (val1, val2)) => { 
    val newKey = Key2(key.a, key.b, extractWhatever(val2)) 
    (newKey, (val1, val2)) 
}} 

withNewKey.reduceByKey..... 

Wenn ich mich nicht falsch, da der Schlüssel geändert hat die Partition verloren, so dass der Betrieb verringern wahrscheinlich die Daten mischen, aber es macht keinen Sinn, da die Schlüssel wurde erweitert und kein Shuffle würde benötigt.

Fehle ich etwas? Wie kann ich diesen Shuffle vermeiden?

Dank

Antwort

2

Sie können mapPartitions mit preservesPartitioning Satz true verwenden:

joined.mapPartitions(
    _.map{ case (key, (val1, val2)) => ... }, 
    true 
) 
Verwandte Themen