2016-08-15 3 views
4

Ich habe zwei Tabellen, die ich gerne zusammenfügen würde. Einer von ihnen hat einen sehr schlechten Datenschräglauf. Dies führt dazu, dass mein Funke-Job nicht parallel läuft, da ein Großteil der Arbeit auf einer Partition ausgeführt wird.Apache Spark Handling Skewed Daten

Ich habe gehört und gelesen und versucht, meine Schlüssel zu salzen, um die Verteilung zu erhöhen. https://www.youtube.com/watch?v=WyfHUNnMutg um 12:45 Sekunden ist genau das, was ich gerne tun würde.

Jede Hilfe oder Tipps wären willkommen. Vielen Dank!

Antwort

2

Ja sollten Sie gesalzen Tasten auf dem größeren Tisch (über Randomisierung) verwenden und dann replizieren die kleinere/kartesisch an die neue gesalzen ein beitreten:

Hier sind ein paar Vorschläge:

Tresata Skew RDD beitreten https://github.com/tresata/spark-skewjoin

python Skew beitreten: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

Die tresata Bibliothek sieht wie folgt aus:

import com.tresata.spark.skewjoin.Dsl._ // for the implicits 

// skewjoin() method pulled in by the implicits 
rdd1.skewJoin(rdd2, defaultPartitioner(rdd1, rdd2), 
DefaultSkewReplication(1)).sortByKey(true).collect.toLis 
+0

Gibt es eine scala-Bibliothek, die eine Schrägstellung kommen tut? Auch im zweiten Link zur Verfügung gestellt. Sie nehmen den ersten Schlüssel des Datenrahmens und fügen ihm eine Zufallszahl hinzu. Im zweiten Datenrahmen replizieren sie den Schlüssel n Mal, wobei n der Bereich der Zufälligkeit ist, den Sie dem ersten Datenrahmen hinzugefügt haben. Dies scheint machbar, wenn der zweite Datenrahmen klein ist. Ist dies der genaue und einzige Weg, meine obige Frage zu stellen? –

+0

Ich habe Import com.tresata.spark.skewjoin.Dsl._ erfolgreich importiert Aber ich kann nicht die .skewJoin Methode auf meinem rdd von RDD [(String, row)] –

+0

ich folgendes ausführen war finden: rdd1.skewJoin (RDD2, defaultPartitioner (RDD1, RDD2), DefaultSkewReplication (1)). sortByKey (true) .collect.toList Aber müssen drei Importe Import com.twitter.algebird.CMSHasherImplicits._ Import org.apache.spark.Partitioner .defaultPartitioner import com.tresata.spark.skewjoin.Dsl._ –