2016-09-30 2 views
0

ich einen Funken Anwendung, die eine Datei mit 100 Millionen Zeilen liest (jede Zeile einen Code hat, wie US1.234.567B1) und bekommt einige Muster aus ihm heraus, wie folgt:Funken OutOfMemoryError wenn eine große Eingabedatei unter

val codes = sc.textFile("/data/codes.txt") 

    def getPattern(code: String) = code.replaceAll("\\d", "d") 

    val patterns: RDD[(String, Int)] = codes 
    .groupBy(getPattern) 
    .mapValues(_.size) 
    .sortBy(- _._2) 

    patterns 
    .map { case (pattern, size) => s"$size\t$pattern" } 
    .saveAsTextFile("/tmp/patterns") 

Ich führe dies auf Master = local [*], und es schlägt mit java.lang.OutOfMemoryError: GC overhead limit exceeded.

Warum ist das?

Ich dachte, dass Spark mit jeder Größe der Eingabe umgehen kann, solange es genug Speicherplatz auf der Festplatte hat.

+1

zwei Dokumente zu erklären, warum groupBy vermieden werden sollte: https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html https://github.com/awesome -spark/Funken-Gotchas –

Antwort

2

Lange kurze Sie versuchen, anti-Muster verwenden Funken:

.groupBy(getPattern) 
.mapValues(_.size) 

, die leicht zum Beispiel ausgedrückt werden kann als:

codes.keyBy(getPattern).mapValues(_ => 1L).reduceByKey(_ + _).sortBy(_._2, false) 

ich, dass Funken dachte jeder Größe verarbeiten kann der Eingabe.

Es kann normalerweise skalieren, solange Sie es nicht unmöglich machen. group/groupByKey auf RDDs erstellen lokale Sammlungen für jeden Schlüssel. Jeder von ihnen muss sich im Gedächtnis eines einzelnen Executors befinden.

1

Ja Funken kann sehr große Dateien verarbeiten, aber die Einheit der Parallelität ist der Executor. Der Fehler "Nicht genügend Speicher" liegt vor, weil der Funkenausführungsspeicher oder der Funkentreiberspeicher nicht ausreicht. Versuchen Sie, spark.executor.memory und spark.driver.memory zu erhöhen und die Anzahl der Executoren zu optimieren, bevor Sie den Job senden.

Sie können diese Werte in einer Eigenschaftendatei oder in SparkConf oder direkt in der Befehlszeile während der Spark-Submission einstellen. Link http://spark.apache.org/docs/latest/configuration.html

Verwandte Themen