2014-10-10 8 views
9

Ich versuche, Apache Spark SQL zu verwenden, um JSL-Protokolldaten in S3 in Parquet-Dateien auch auf S3 zu schreiben. Mein Code ist im Grunde:Spark SQL konnte das Schreiben von Parquet-Daten mit einer großen Anzahl von Shards nicht abschließen

import org.apache.spark._ 
val sqlContext = sql.SQLContext(sc) 
val data = sqlContext.jsonFile("s3n://...", 10e-6) 
data.saveAsParquetFile("s3n://...") 

Dieser Code funktioniert, wenn ich bis 2000 Partitionen haben und nicht für 5000 oder mehr beträgt, unabhängig von der Datenmenge. Normalerweise könnte man nur die Partitionen zu einer akzeptablen Anzahl verschmelzen, aber dies ist ein sehr großer Datensatz und bei 2000 Partitionen schlug ich das Problem in diesen question

Ich verwende diese auf Funken 1.1.0 beschreibt

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s 
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s 
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ... 
java.io.IOException: Could not read footer: java.lang.NullPointerException 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190) 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203) 
     at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409) 
     at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77) 
     at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) 
     at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56) 
     at $line37.$read$$iwC$$iwC.<init>(<console>:58) 
     at $line37.$read$$iwC.<init>(<console>:60) 
     at $line37.$read.<init>(<console>:62) 
     at $line37.$read$.<init>(<console>:66) 
     at $line37.$read$.<clinit>(<console>) 
     at $line37.$eval$.<init>(<console>:7) 
     at $line37.$eval$.<clinit>(<console>) 
     at $line37.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
     at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NullPointerException 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106) 
     at java.io.BufferedInputStream.close(BufferedInputStream.java:472) 
     at java.io.FilterInputStream.close(FilterInputStream.java:181) 
     at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
auf einem R3.xlarge in ec2. Ich verwende die Spark-Shell-Konsole, um den obigen Code auszuführen. Ich bin in der Lage, nicht-triviale Abfragen auf dem data SchemaRDD Objekt danach durchzuführen, so scheint es nicht ein Ressourcenproblem zu sein. Es ist auch möglich, die resultierende Parquet-Datei zu lesen und abzufragen, sie dauert aufgrund des Fehlens von Zusammenfassungsdateien nur sehr lange.

+1

ich einen Fehler zu dieser Datei würde. https://issues.apache.org/jira/browse/SPARK/ –

Antwort

0

Versuchen Sie, diese Eigenschaft als falsch gesetzt:

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false"); 
Verwandte Themen