Ich möchte Funke verwenden, um eine große (51 GB) XML-Datei (auf einer externen Festplatte) in einen Datenrahmen zu lesen (mit spark-xml plugin), einfache Zuordnung/Filterung, neu anordnen und dann als CSV-Datei auf die Festplatte schreiben.Nicht genügend Speicherfehler beim Lesen einer großen Datei in Spark 2.1.0
Aber ich bekomme immer eine java.lang.OutOfMemoryError: Java heap space
egal wie ich das zwicke.
ich verstehen, warum wollen aufhören
Sollte es nicht die OOM Fehler in mehr die Aufgabe, Teile aufgeteilt nicht die Anzahl der Partitionen zu erhöhen, so dass jeder einzelne Teil kleiner ist und Speicher nicht dazu führt, Probleme?
(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)
Dinge, die ich versucht habe:
- Repartitionierung/Koaleszieren zu (5.000 und 10.000 Partitionen) der Datenrahmen beim Lesen und beim Schreiben (Anfangswert ist 1604)
- eine kleinere Anzahl von mit Testamentsvollstrecker (6, 4, auch mit Testamentsvollstrecker bekomme ich OOM Fehler!)
- die Größe der Split-Dateien verringern (Standard sieht aus wie es 33MB ist)
- give Tonnen RAM (alles, was ich)
- Anstieg
spark.memory.fraction
zu 0,8 - Abnahme
spark.memory.storageFraction
bis 0.2 (Standard-0,6) (Standard: 0,5) - gesetzt
spark.default.parallelism
auf 30 und 40 (Standard: 8 für mich) - gesetzt
spark.files.maxPartitionBytes
zu 64M (Standard 128M)
All mein Code hier ist (man beachte ich bin das Caching nichts):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where(df2.col("_TypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
NOTES
- der Eingang Split sind wirklich klein (33MB nur), also warum kann ich nicht 8 Threads jede Verarbeitung einer Spaltung? es sollte wirklich nicht mein Gedächtnis blasen (ich habe se
UPDATE Ich habe des Codes eine kürzere Version geschrieben, die nur die Datei liest und dann forEachPartition (println).
ich erhalte die gleichen OOM-Fehler:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
PS: ich bin mit Funken v 2.1.0 Meine Maschine hat 8 Kern und 16 GB rAM..
Haben Sie die Größe der erstellten Partitionen in Spark UI inspiziert? – Khozzy
@Khozzy Das habe ich bekommen, als ich die App mit 1604 Partitionen für den Lese-DF und 50 Partitionen für den DF ausgeführt habe: [screenshot-spark-ui] (http://i.imgur.com/a5LjEmc. png) –
Ja, aber schauen Sie in die Benutzeroberfläche während der Jobausführung. Sie werden sehen, wie lange jede Aufgabe ausgeführt wird und wie Ihre CPU genutzt wird (vielleicht gibt es Nachzügler). – Khozzy