2017-05-05 3 views
3

Ich möchte Funke verwenden, um eine große (51 GB) XML-Datei (auf einer externen Festplatte) in einen Datenrahmen zu lesen (mit spark-xml plugin), einfache Zuordnung/Filterung, neu anordnen und dann als CSV-Datei auf die Festplatte schreiben.Nicht genügend Speicherfehler beim Lesen einer großen Datei in Spark 2.1.0

Aber ich bekomme immer eine java.lang.OutOfMemoryError: Java heap space egal wie ich das zwicke.

ich verstehen, warum wollen aufhören

Sollte es nicht die OOM Fehler in mehr die Aufgabe, Teile aufgeteilt nicht die Anzahl der Partitionen zu erhöhen, so dass jeder einzelne Teil kleiner ist und Speicher nicht dazu führt, Probleme?

(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)

Dinge, die ich versucht habe:

  • Repartitionierung/Koaleszieren zu (5.000 und 10.000 Partitionen) der Datenrahmen beim Lesen und beim Schreiben (Anfangswert ist 1604)
  • eine kleinere Anzahl von mit Testamentsvollstrecker (6, 4, auch mit Testamentsvollstrecker bekomme ich OOM Fehler!)
  • die Größe der Split-Dateien verringern (Standard sieht aus wie es 33MB ist)
  • give Tonnen RAM (alles, was ich)
  • Anstieg spark.memory.fraction zu 0,8
  • Abnahme spark.memory.storageFraction bis 0.2 (Standard-0,6) (Standard: 0,5)
  • gesetzt spark.default.parallelism auf 30 und 40 (Standard: 8 für mich)
  • gesetzt spark.files.maxPartitionBytes zu 64M (Standard 128M)

All mein Code hier ist (man beachte ich bin das Caching nichts):

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) // defined previously 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
// prints 1604 

// i pass `numPartitions` as cli arguments 
val df2 = df.coalesce(numPartitions) 

// filter and select only the cols i'm interested in 
val dsout = df2 
    .where(df2.col("_TypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
).as[Post] 

// regexes to clean the text 
val tagPat = "<[^>]+>".r 
val angularBracketsPat = "><|>|<" 
val whitespacePat = """\s+""".r 


// more mapping 
dsout 
.map{ 
    case Post(id,title,body,tags) => 

    val body1 = tagPat.replaceAllIn(body,"") 
    val body2 = whitespacePat.replaceAllIn(body1," ") 

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

} 
.orderBy(rand(SEED)) // random sort 
.write // write it back to disk 
.option("quoteAll", true) 
.mode(SaveMode.Overwrite) 
.csv(output) 

NOTES

  • der Eingang Split sind wirklich klein (33MB nur), also warum kann ich nicht 8 Threads jede Verarbeitung einer Spaltung? es sollte wirklich nicht mein Gedächtnis blasen (ich habe se

UPDATE Ich habe des Codes eine kürzere Version geschrieben, die nur die Datei liest und dann forEachPartition (println).

ich erhalte die gleichen OOM-Fehler:

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 
    .repartition(numPartitions) 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 

df 
    .where(df.col("_PostTypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
    df("_Tags").as("tags") 
).as[Post] 
    .map { 
    case Post(id, title, body, tags) => 
     Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase)) 
    } 
    .foreachPartition { rdd => 
    if (rdd.nonEmpty) { 
     println(s"HI! I'm an RDD and I have ${rdd.size} elements!") 
    } 
    } 

PS: ich bin mit Funken v 2.1.0 Meine Maschine hat 8 Kern und 16 GB rAM..

+0

Haben Sie die Größe der erstellten Partitionen in Spark UI inspiziert? – Khozzy

+0

@Khozzy Das habe ich bekommen, als ich die App mit 1604 Partitionen für den Lese-DF und 50 Partitionen für den DF ausgeführt habe: [screenshot-spark-ui] (http://i.imgur.com/a5LjEmc. png) –

+0

Ja, aber schauen Sie in die Benutzeroberfläche während der Jobausführung. Sie werden sehen, wie lange jede Aufgabe ausgeführt wird und wie Ihre CPU genutzt wird (vielleicht gibt es Nachzügler). – Khozzy

Antwort

0

Da Sie Ihre RDD zweimal gelagert und Ihre Logik muss so sein ändern oder Filter mit SparkSql

val df: DataFrame = SparkFactory.spark.read 
     .option("mode", "DROPMALFORMED") 
     .format("com.databricks.spark.xml") 
     .schema(customSchema) // defined previously 
     .option("rowTag", "row") 
     .load(s"$pathToInputXML") 
     .coalesce(numPartitions) 

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
    // prints 1604 


    // regexes to clean the text 
    val tagPat = "<[^>]+>".r 
    val angularBracketsPat = "><|>|<" 
    val whitespacePat = """\s+""".r 

    // filter and select only the cols i'm interested in 
    df 
     .where(df.col("_TypeId") === "1") 
     .select(
     df("_Id").as("id"), 
     df("_Title").as("title"), 
     df("_Body").as("body"), 
    ).as[Post] 
     .map{ 
     case Post(id,title,body,tags) => 

      val body1 = tagPat.replaceAllIn(body,"") 
      val body2 = whitespacePat.replaceAllIn(body1," ") 

      Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

     } 
     .orderBy(rand(SEED)) // random sort 
     .write // write it back to disk 
     .option("quoteAll", true) 
     .mode(SaveMode.Overwrite) 
     .csv(output) 
+0

Alles ein einziger DF zu machen, hat nicht wirklich geholfen .. Ich habe immer noch 'java.lang.OutOfMemoryError: Java heap space' –

-2

Sie können die Heap-Größe ändern, um die folgenden in Ihrer Umgebungsvariable durch Zugabe:

  1. Umwelt Variablenname: _JAVA_OPTIONS
  2. Variable Wert Umwelt: -Xmx512M -Xms512M
Verwandte Themen