2015-01-03 6 views
7

Zunächst möchte ich darauf hinweisen, dass ich sowohl Spark als auch Scala ziemlich neu bin. Ich versuche, die versprochene Spark-Leistung zu untersuchen, indem ich versuche, eine der Hadoop Map/Reduce-Jobs, die ich in der Vergangenheit gemacht habe, zu migrieren. Dieser Job dauert 14 Minuten auf Hadoop, wobei 3x r3.2x große Maschinen für die Eingabe von 16 komprimierten bzip-Dateien mit jeweils 170 MB verwendet werden. Ich übersetzte es Scala/Funken das Beste, was ich konnte, in etwa wie folgt:Wo ist der Leistungsengpass in diesem Spark/Scala-Code?

val conceptData = spark.textFile(inputPath) 
val result = conceptData.repartition(60).cache() 
    .map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)}) 
    .flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head)))))) 
    .reduceByKey((a,b) => combine(a,b)) 
    .map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2)) 
result.saveAsTextFile(outputPath) 

def print(tuples: List[(String, Any)]): String = 
{ 
    tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _) 
} 

def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) = 
{ 
    (a._1 + b._1,a._2 ++ b._2) 
} 

object JsonUtil { 
    val mapper = new ObjectMapper() with ScalaObjectMapper 
    mapper.registerModule(DefaultScalaModule) 
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 

    def fromJson[T](json: String)(implicit m : Manifest[T]): T = { 
    mapper.readValue[T](json) 
    } 
} 

ich am Anfang der Neuaufteilung Befehl verwendet, um die Partitionen auf 60 eingestellt, da ich irgendwo gelesen, das gut ist 2-3 zu haben Partitionen pro Kern Ich betreiben diesen Funkenjob auf den gleichen 3x r3.2xlarge Maschinen (jeweils 8 Cores und 58G verfügbar), damit ich meine Arbeit auf folgende Weise einreichen:

spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...) 

Und es dauerte mehr als 1 Stunde laufen durch die gleiche Eingabe ... Ich bin mir nicht sicher, ob das Problem in der Scala- oder Spark-Konfiguration liegt, daher ist jede Hilfe willkommen.

Mit freundlichen Grüßen, Augusto

EDIT 1: Durchschnittliche mal für einige Operationen:

die Dateien von S3 Lesen: ~ 2 Minuten

flatMap: ~ 11 Minuten

reduceByKey :> 1 Stunde

Die verwendeten Schlüssel sind S3 Pfade also Sie können ziemlich lang werden, ich weiß nicht, ob das einen Unterschied macht.

EDIT 2: I ersetzt die reduceByKey Funktion mit .reduceByKey((a,b) => a) und der Job endet unter 10min so etwas wirklich falsch mit der combine Funktion

+0

paar Fragen. 1. Können Sie die von Ihnen verwendete Version von Spark angeben? 2. Ich gehe davon aus, dass Sie im Standalone-Modus arbeiten. 3. Haben Sie sich das Spark UI angesehen? Was sagt es ? Schließlich haben Sie versucht, jede dieser Operationen von der Scala REPL auszuführen? Sie können dies tun, indem Sie Ihre erste Zeile aufschlüsseln. Spark ist viel schneller als das. Ein kurzer Blick auf Ihren Code sagt, dass Sie nur 3G Daten analysieren und einen MR darauf ausführen. Wenn Sie Spark 1 verwenden.1+ können Sie 'conceptData' als SparkSQL-Tabelle laden und dann den Wert dort lesen und sehen, wie lange es dauert. –

+0

1) Spark 1.2 2) Standalone 3) Die UI zeigte an, dass bis zur flatMap-Zeile die Ausführung ungefähr 7 Minuten dauerte, nachdem die meiste Zeit verbraucht wurde. – Augusto

+0

Wie wäre es mit dem REPL? 'val conceptData = spark.textFile (inputPath)' gefolgt von 'conceptData.count' und sehen, wie lange es dauert. Sie sollten die gleichen Einstellungen von Ihrer Scala REPL verwenden (z. B. spark/bin/spark-shell --executor-memory 58G ..) Ich nehme auch an, dass Sie lokal von HDFS laden (auf EC2). –

Antwort

0

Dies kam zu meinen noobish Scala Programmierkenntnisse - es dauert nur 15 Minuten, wenn auf die folgenden performant Scala geändert:

val conceptData = spark.textFile(inputPath).repartition(24) 

val result = conceptData.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)}) 
    .flatMap(metrics => metrics._2.map(t => (t._1,(1, List(metrics._1+"\u200e"+ t._2.head))))) 
    .reduceByKey((a,b) => (a._1 + b._1, a._2:::b._2)) 
    .map(t=> t._1 + "\t" + t._2._1 + "\t" + t._2._2.mkString("\u200f")) 

Es kann wohl noch weiter verbessert werden. Wie auch immer, danke für die Hilfe alle.

Mit freundlichen Grüßen,

Augusto

0

Basierend auf der Tatsache sein, dass die meiste Zeit nach der ausgegeben wird, flatMap, würde ich vermuten, dass die Shuffle ist, was Sie verlangsamt und nicht CPU-Auslastung. Sie können versuchen, den Job mit weniger Partitionen auszuführen. Eine andere Sache, die Sie versuchen können, ist, die reduceByKey() durch foldByKey() zu ersetzen, die assoziativ, aber nicht kommutativ ist, was bedeutet, dass sie die Reihenfolge der RDDs beim Ausführen des Mähdreschers beibehalten muss, und dies könnte zu weniger Netzwerkverkehr während des Mischens führen.

+0

Ich habe versucht mit foldByKey und der Job stirbt, wenn ich das benutze. Ich habe es zweimal ausprobiert und es geschah zum selben Zeitpunkt (nach 27 Minuten (- die einzige Fehlermeldung, die ich finden kann ist: "executor.CoarseGrainedExecutorBackend: Driver Disassociated" – Augusto