2017-10-02 1 views
0

Ich habe einen Google Dataflow-Job mit Bibliothek Version 1.9.1 Job, der Job dauerte Laufzeitargumente. Wir haben die TextIO.read(). From(). OhneValidation() verwendet. Seit wir zu google dataflow 2.0.0 migriert haben, wird in 2.0.0 die Option ohne Validierung entfernt. Die Versionshinweis-Seite spricht nicht darüber https://cloud.google.com/dataflow/release-notes/release-notes-java-2.Konstruieren einer Pipline mit ValueProivder.RuntimeProvider

Wir haben versucht, die Eingabe als ValueProvider.RuntimeProvider zu übergeben. Aber während des Pipelinebaus erhalten wir den folgenden Fehler. Wenn es als ValueProvider übergeben wird, versucht die Pipeline-Erstellung, den Wertanbieter zu validieren. Wie stelle ich einen Laufzeitwertanbieter für eine TextIO-Eingabe in google cloud dataflow 2.0.0 zur Verfügung?

java.lang.RuntimeException: Die Methode getInputFile sollte keinen Rückgabetyp RuntimeValueProvider haben, stattdessen ValueProvider verwenden. bei org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault (ProxyInvocationHandler.java:505)

+0

RuntimeValueProvider eine extrem niedrige Ebene interne Implementierung-Detail-Klasse, die nie von den Benutzern verwendet werden kann; Die Tatsache, dass es öffentlich sichtbar ist, ist ein unglücklicher Zufall des Java-Sichtbarkeitssystems. Was meinen Sie mit "die Pipeline-Erstellung versucht, den Wertanbieter zu validieren"? Ich denke nicht, dass das passieren sollte. Können Sie Ihren Code und einen vollständigen Ausdruck des Fehlers, den Sie erhalten, einfügen? – jkff

+0

Wie @jkff gesagt hat, bitte posten Sie Ihren Code. Ich nehme an, Sie verwenden Templat-Pipelines? Ich verwende '2.1.0' mit Parameter für Vorlagen ohne Probleme. –

Antwort

0

Ich werde Sie verwenden Templat-Pipelines zu übernehmen, und dass Ihre Pipeline Laufzeitparameter verbraucht. Er ist ein funktionierendes Beispiel, das das Cloud Dataflow SDK Version 2.1.0 verwendet. Es liest eine Datei aus GCS (zur Laufzeit an die Vorlage übergeben), wandelt jede Zeile in eine TableRow um und schreibt nach BigQuery. Es ist ein triviales Beispiel, aber es funktioniert mit 2.1.0.

Programm args sind wie folgt:

--project=<your_project_id> --runner=DataflowRunner --templateLocation=gs://<your_bucket>/dataflow_pipeline --stagingLocation=gs://<your_bucket>/jars --tempLocation=gs://<your_bucket>/tmp

public class TemplatePipeline { 
    public static void main(String[] args) { 
     PipelineOptionsFactory.register(TemplateOptions.class); 
     TemplateOptions options = PipelineOptionsFactory 
       .fromArgs(args) 
       .withValidation() 
       .as(TemplateOptions.class); 
     Pipeline pipeline = Pipeline.create(options); 
     pipeline.apply("READ", TextIO.read().from(options.getInputFile()).withCompressionType(TextIO.CompressionType.GZIP)) 
       .apply("TRANSFORM", ParDo.of(new WikiParDo())) 
       .apply("WRITE", BigQueryIO.writeTableRows() 
         .to(String.format("%s:dataset_name.wiki_demo", options.getProject())) 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     pipeline.run(); 
    } 

    private static TableSchema getTableSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("year").setType("INTEGER")); 
     fields.add(new TableFieldSchema().setName("month").setType("INTEGER")); 
     fields.add(new TableFieldSchema().setName("day").setType("INTEGER")); 
     fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("language").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("title").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("views").setType("INTEGER")); 
     return new TableSchema().setFields(fields); 
    } 

    public interface TemplateOptions extends DataflowPipelineOptions { 
     @Description("GCS path of the file to read from") 
     ValueProvider<String> getInputFile(); 

     void setInputFile(ValueProvider<String> value); 
    } 

    private static class WikiParDo extends DoFn<String, TableRow> { 
     @ProcessElement 
     public void processElement(ProcessContext c) throws Exception { 
      String[] split = c.element().split(","); 
      TableRow row = new TableRow(); 
      for (int i = 0; i < split.length; i++) { 
       TableFieldSchema col = getTableSchema().getFields().get(i); 
       row.set(col.getName(), split[i]); 
      } 
      c.output(row); 
     } 
    } 
} 
Verwandte Themen