2016-11-02 2 views
1

Ich versuche, eine benutzerdefinierte Senke zum Entpacken von Dateien zu erstellen.Google Dataflow: java.lang.IllegalArgumentException: Kann nicht setCoder (null)

Mit diesem einfachen Code:

public static class ZipIO{  
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<String> { 

    private static final long serialVersionUID = -7414200726778377175L; 
    private final String unzipTarget; 

     public Sink withDestinationPath(String s){ 
     if(s!=""){ 
      return new Sink(s); 
     } 
     else { 
      throw new IllegalArgumentException("must assign destination path"); 
     } 

     } 

     protected Sink(String path){ 
      this.unzipTarget = path; 
     } 

     @Override 
     public void validate(PipelineOptions po){ 
      if(unzipTarget==null){ 
       throw new RuntimeException(); 
      } 
     } 

     @Override 
     public ZipFileWriteOperation createWriteOperation(PipelineOptions po){ 
      return new ZipFileWriteOperation(this); 
     } 

    } 

    private static class ZipFileWriteOperation extends WriteOperation<String, UnzipResult>{ 

    private static final long serialVersionUID = 7976541367499831605L; 
    private final ZipIO.Sink sink; 

     public ZipFileWriteOperation(ZipIO.Sink sink){ 
      this.sink = sink; 
     } 



     @Override 
     public void initialize(PipelineOptions po) throws Exception{ 

     } 

     @Override 
     public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception { 
     long totalFiles = 0; 
     for(UnzipResult r:writerResults){ 
      totalFiles +=r.filesUnziped; 
     } 
     LOG.info("Unzipped {} Files",totalFiles); 
     } 

     @Override 
     public ZipIO.Sink getSink(){ 
      return sink; 
     } 

     @Override 
     public ZipWriter createWriter(PipelineOptions po) throws Exception{ 
      return new ZipWriter(this); 
     } 

    } 

    private static class ZipWriter extends Writer<String, UnzipResult>{ 
     private final ZipFileWriteOperation writeOp; 
     public long totalUnzipped = 0; 

     ZipWriter(ZipFileWriteOperation writeOp){ 
      this.writeOp = writeOp; 
     } 

     @Override 
     public void open(String uID) throws Exception{ 
     } 

     @Override 
     public void write(String p){ 
      System.out.println(p); 
     } 

     @Override 
     public UnzipResult close() throws Exception{ 
      return new UnzipResult(this.totalUnzipped); 
     } 

     @Override 
     public ZipFileWriteOperation getWriteOperation(){ 
      return writeOp; 
     } 


    } 

    private static class UnzipResult implements Serializable{ 
    private static final long serialVersionUID = -8504626439217544799L; 
    public long filesUnziped=0;  
     public UnzipResult(long filesUnziped){ 
      this.filesUnziped=filesUnziped; 
     } 
    } 
} 

}

Die Verarbeitung mit Fehlern fehlschlägt:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot setCoder(null) at com.google.cloud.dataflow.sdk.values.TypedPValue.setCoder(TypedPValue.java:67) at com.google.cloud.dataflow.sdk.values.PCollection.setCoder(PCollection.java:150) at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:380) at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:112) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2118) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2099) at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:75) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:465) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:275) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:463) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) at com.mcd.de.tlogdataflow.StarterPipeline.main(StarterPipeline.java:93)

Jede Hilfe sehr geschätzt wird.

Dank & BR Philipp

Antwort

0

Dieser Absturz wird durch einen Fehler in dem Datenfluss Java SDK (specifically, this line), die auch in der Apache Beam (Inkubation) Java SDK vorlag verursacht wird.

Die Methode Sink.WriterOperation#getWriterResultCoder() muss immer außer Kraft gesetzt werden, aber wir haben sie nicht markiert abstract. Es ist in Beam behoben, aber im Dataflow-SDK unverändert. Sie sollten diese Methode überschreiben und einen geeigneten Coder zurückgeben.

Sie haben einige Optionen mit dem Coder zu kommen:

  1. Ihre eigene kleine Coder Klasse schreiben, von VarLongCoder oder BigEndianLongCoder Einwickeln
  2. einfach ein long anstelle der Struktur UnzipResult, so dass Sie verwenden können, diejenigen wie sie sind.
  3. Weniger ratsam aufgrund der überschüssigen Größe, könnten Sie SerializableCoder.of(UnzipResult.class)
+0

Hallo Kenn, vielen Dank für Ihre Hilfe. Ich habe es in eine Long-Struktur geändert, wobei die Operation close() einen Long-Wert zurückgegeben hat. Trotzdem bekomme ich das selbe nicht kannCoder (null) Exception setzen. – bigdataclown

+0

Hallo Kenn, hast du eine Lösung dafür gefunden? Danke & BR Philipp – bigdataclown

+0

Hast du 'getWriterResultCoder()' überschrieben? Das müssen Sie tun. –

Verwandte Themen