2017-06-14 1 views
0

In einem Funken Streaming Job, ich speichere meine rdd Daten in eine Datei Parkett in HDFS von Hadoop Code-Snippet:java.lang.NullPointerException: writeSupportClass darf nicht null sein, während in einer Funken Parkett Datei zu schreiben Job Streaming

readyToSave.foreachRDD((VoidFunction<JavaPairRDD<Void, MyProtoRecord>>) rdd -> {   
      Configuration configuration = rdd.context().hadoopConfiguration(); 
      Job job = Job.getInstance(configuration); 
      ParquetOutputFormat.setWriteSupportClass(job, ProtoWriteSupport.class); 
      ProtoParquetOutputFormat.setProtobufClass(job, MyProtoRecord.class); 
      rdd.saveAsNewAPIHadoopFile("path-to-hdfs", Void.class, MyProtoRecord.class, ParquetOutputFormat.class, configuration); 
    }); 

und ich Ausnahme unter:

java.lang.NullPointerException: writeSupportClass should not be null 
at parquet.Preconditions.checkNotNull(Preconditions.java:38) 
at parquet.hadoop.ParquetOutputFormat.getWriteSupport(ParquetOutputFormat.java:326) 
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:272) 
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1112) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
at org.apache.spark.scheduler.Task.run(Task.scala:86) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

wie kann ich das Problem lösen?

Antwort

0

Das Problem herausgefunden! während "saveAsNewAPIHadoopFile() Methode aufrufen, sould Sie Ihren Job Konfiguration (job.getConfiguration()) angeben:

readyToSave.foreachRDD((VoidFunction<JavaPairRDD<Void, MyProtoRecord>>) rdd -> { 
      Configuration configuration = rdd.context().hadoopConfiguration(); 
      Job job = Job.getInstance(configuration); 
      ParquetOutputFormat.setWriteSupportClass(job, ProtoWriteSupport.class); 
      ProtoParquetOutputFormat.setProtobufClass(job, MyProtoRecord.class); 
      rdd.saveAsNewAPIHadoopFile("path-to-hdfs", Void.class, MyProtoRecord.class, ParquetOutputFormat.class, job.getConfiguration()); 
    }); 
Verwandte Themen