Ich möchte eine Datenfluss-Jobvorlage erstellen, die den Dateinamen in GCS verwendet und sie in einem PubSub-Thema veröffentlicht. Ich folgte dem Tutorial bei this Link, aber das scheint nicht für mich arbeiten.Dataflow-Jobvorlagen-Erstellungsfehler aus
My Klassendefinition ist die folgende -
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.TemplatingDataflowPipelineRunner;
public class PubSubOutputTest {
public static void main(String[] args) {
// Create pipeline options.
pubSubOutputOptions options = PipelineOptionsFactory.fromArgs(args).as(pubSubOutputOptions.class);
options.setRunner(TemplatingDataflowPipelineRunner.class);
options.setTempLocation("gs://staging-bucket");
Pipeline p = Pipeline.create(options);
// Read the file from the GCS Bucket.
p.apply(TextIO.Read.named("Read file from GCS.").from(options.getInputFile()).withoutValidation())
.apply(PubsubIO.Write.named("Write to Pub Sub topic.")
.topic("projects/my-project/topics/my-topic"));
// Run the pipeline.
p.run();
}
}
Die Schnittstelle, die das Valueprovider implementiert die Laufzeiteingaben zu greifen ist folgende -
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.ValueProvider;
public interface pubSubOutputOptions extends PipelineOptions {
@Default.String("gs://default-file.txt")
ValueProvider getInputFile();
void setInputFile(ValueProvider value);
}
Die Vorlagenerstellung wird der folgende Fehler gibt.
Exception in thread "main" java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:408)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:146)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.run(DataflowPipelineRunner.java:570)
at com.google.cloud.dataflow.sdk.runners.TemplatingDataflowPipelineRunner.run(TemplatingDataflowPipelineRunner.java:137)
at com.google.cloud.dataflow.sdk.runners.TemplatingDataflowPipelineRunner.run(TemplatingDataflowPipelineRunner.java:44)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
at com.my.project.dataflow.PubSubOutputTest.main(PubSubOutputTest.java:32)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'inputFile' with value 'RuntimeValueProvider{propertyName=inputFile, default=gs://default-file.txt, value=null}'
at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE(JsonMappingException.java:284)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3008)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:406)
Ich bin neu in Google Cloud Dataflow und Java. Ich habe alles in der Dokumentation implementiert, aber ich hätte etwas Offensichtliches übersehen können.
@Paritosh betrachten auch up-Voting eine Antwort, wenn es Ihnen geholfen, da dies in so wichtig ist. –