Ich versuche, Apache Spark SQL zu verwenden, um JSL-Protokolldaten in S3 in Parquet-Dateien auch auf S3 zu schreiben. Mein Code ist im Grunde:Spark SQL konnte das Schreiben von Parquet-Daten mit einer großen Anzahl von Shards nicht abschließen
import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")
Dieser Code funktioniert, wenn ich bis 2000 Partitionen haben und nicht für 5000 oder mehr beträgt, unabhängig von der Datenmenge. Normalerweise könnte man nur die Partitionen zu einer akzeptablen Anzahl verschmelzen, aber dies ist ein sehr großer Datensatz und bei 2000 Partitionen schlug ich das Problem in diesen question
Ich verwende diese auf Funken 1.1.0 beschreibt
14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
at $line37.$read$$iwC$$iwC.<init>(<console>:58)
at $line37.$read$$iwC.<init>(<console>:60)
at $line37.$read.<init>(<console>:62)
at $line37.$read$.<init>(<console>:66)
at $line37.$read$.<clinit>(<console>)
at $line37.$eval$.<init>(<console>:7)
at $line37.$eval$.<clinit>(<console>)
at $line37.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
auf einem R3.xlarge in ec2. Ich verwende die Spark-Shell-Konsole, um den obigen Code auszuführen. Ich bin in der Lage, nicht-triviale Abfragen auf dem
data
SchemaRDD Objekt danach durchzuführen, so scheint es nicht ein Ressourcenproblem zu sein. Es ist auch möglich, die resultierende Parquet-Datei zu lesen und abzufragen, sie dauert aufgrund des Fehlens von Zusammenfassungsdateien nur sehr lange.
ich einen Fehler zu dieser Datei würde. https://issues.apache.org/jira/browse/SPARK/ –