Zunächst möchte ich darauf hinweisen, dass ich sowohl Spark als auch Scala ziemlich neu bin. Ich versuche, die versprochene Spark-Leistung zu untersuchen, indem ich versuche, eine der Hadoop Map/Reduce-Jobs, die ich in der Vergangenheit gemacht habe, zu migrieren. Dieser Job dauert 14 Minuten auf Hadoop, wobei 3x r3.2x große Maschinen für die Eingabe von 16 komprimierten bzip-Dateien mit jeweils 170 MB verwendet werden. Ich übersetzte es Scala/Funken das Beste, was ich konnte, in etwa wie folgt:Wo ist der Leistungsengpass in diesem Spark/Scala-Code?
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object JsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
}
ich am Anfang der Neuaufteilung Befehl verwendet, um die Partitionen auf 60 eingestellt, da ich irgendwo gelesen, das gut ist 2-3 zu haben Partitionen pro Kern Ich betreiben diesen Funkenjob auf den gleichen 3x r3.2xlarge Maschinen (jeweils 8 Cores und 58G verfügbar), damit ich meine Arbeit auf folgende Weise einreichen:
spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...)
Und es dauerte mehr als 1 Stunde laufen durch die gleiche Eingabe ... Ich bin mir nicht sicher, ob das Problem in der Scala- oder Spark-Konfiguration liegt, daher ist jede Hilfe willkommen.
Mit freundlichen Grüßen, Augusto
EDIT 1: Durchschnittliche mal für einige Operationen:
die Dateien von S3 Lesen: ~ 2 Minuten
flatMap: ~ 11 Minuten
reduceByKey :> 1 Stunde
Die verwendeten Schlüssel sind S3 Pfade also Sie können ziemlich lang werden, ich weiß nicht, ob das einen Unterschied macht.
EDIT 2: I ersetzt die reduceByKey
Funktion mit .reduceByKey((a,b) => a)
und der Job endet unter 10min so etwas wirklich falsch mit der combine
Funktion
paar Fragen. 1. Können Sie die von Ihnen verwendete Version von Spark angeben? 2. Ich gehe davon aus, dass Sie im Standalone-Modus arbeiten. 3. Haben Sie sich das Spark UI angesehen? Was sagt es ? Schließlich haben Sie versucht, jede dieser Operationen von der Scala REPL auszuführen? Sie können dies tun, indem Sie Ihre erste Zeile aufschlüsseln. Spark ist viel schneller als das. Ein kurzer Blick auf Ihren Code sagt, dass Sie nur 3G Daten analysieren und einen MR darauf ausführen. Wenn Sie Spark 1 verwenden.1+ können Sie 'conceptData' als SparkSQL-Tabelle laden und dann den Wert dort lesen und sehen, wie lange es dauert. –
1) Spark 1.2 2) Standalone 3) Die UI zeigte an, dass bis zur flatMap-Zeile die Ausführung ungefähr 7 Minuten dauerte, nachdem die meiste Zeit verbraucht wurde. – Augusto
Wie wäre es mit dem REPL? 'val conceptData = spark.textFile (inputPath)' gefolgt von 'conceptData.count' und sehen, wie lange es dauert. Sie sollten die gleichen Einstellungen von Ihrer Scala REPL verwenden (z. B. spark/bin/spark-shell --executor-memory 58G ..) Ich nehme auch an, dass Sie lokal von HDFS laden (auf EC2). –