2017-02-01 3 views
0

Ich möchte eine Datenfluss-Jobvorlage erstellen, die den Dateinamen in GCS verwendet und sie in einem PubSub-Thema veröffentlicht. Ich folgte dem Tutorial bei this Link, aber das scheint nicht für mich arbeiten.Dataflow-Jobvorlagen-Erstellungsfehler aus

My Klassendefinition ist die folgende -

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.io.TextIO; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.TemplatingDataflowPipelineRunner; 

public class PubSubOutputTest { 
    public static void main(String[] args) { 
    // Create pipeline options. 
    pubSubOutputOptions options = PipelineOptionsFactory.fromArgs(args).as(pubSubOutputOptions.class); 
    options.setRunner(TemplatingDataflowPipelineRunner.class); 
    options.setTempLocation("gs://staging-bucket"); 
    Pipeline p = Pipeline.create(options); 
    // Read the file from the GCS Bucket. 
    p.apply(TextIO.Read.named("Read file from GCS.").from(options.getInputFile()).withoutValidation()) 
     .apply(PubsubIO.Write.named("Write to Pub Sub topic.") 
     .topic("projects/my-project/topics/my-topic")); 
    // Run the pipeline. 
    p.run(); 
    } 
} 

Die Schnittstelle, die das Valueprovider implementiert die Laufzeiteingaben zu greifen ist folgende -

import com.google.cloud.dataflow.sdk.options.Default; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.ValueProvider; 

public interface pubSubOutputOptions extends PipelineOptions { 
    @Default.String("gs://default-file.txt") 
    ValueProvider getInputFile(); 
    void setInputFile(ValueProvider value); 
} 

Die Vorlagenerstellung wird der folgende Fehler gibt.

Exception in thread "main" java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON. 
    at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:408) 
    at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:146) 
    at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.run(DataflowPipelineRunner.java:570) 
    at com.google.cloud.dataflow.sdk.runners.TemplatingDataflowPipelineRunner.run(TemplatingDataflowPipelineRunner.java:137) 
    at com.google.cloud.dataflow.sdk.runners.TemplatingDataflowPipelineRunner.run(TemplatingDataflowPipelineRunner.java:44) 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181) 
    at com.my.project.dataflow.PubSubOutputTest.main(PubSubOutputTest.java:32) 
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'inputFile' with value 'RuntimeValueProvider{propertyName=inputFile, default=gs://default-file.txt, value=null}' 
    at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE(JsonMappingException.java:284) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3008) 
    at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:406) 

Ich bin neu in Google Cloud Dataflow und Java. Ich habe alles in der Dokumentation implementiert, aber ich hätte etwas Offensichtliches übersehen können.

Antwort

2

Es scheint ein Fehler bei der Deklaration der Option zu bestehen. Ich denke, dass Sie ein Template-Parameter zu ValueProvider zur Verfügung stellen möchten, etwa so:

@Default.String("gs://default-file.txt") 
    ValueProvider<String> getInputFile(); 
    void setInputFile(ValueProvider<String> value); 
+0

@Paritosh betrachten auch up-Voting eine Antwort, wenn es Ihnen geholfen, da dies in so wichtig ist. –

Verwandte Themen