1

Ich versuche einen Datenspeicherfluss zu schreiben, der einen Stream für Pub-Sub einliest und in eine große Abfrage schreibt.Schreiben in BigQuery aus Cloud Dataflow: Ich kann keine Side-Input-Ansicht von Eingabe erstellen

Beim Versuch, das Werkzeug, das ich die Fehlermeldung „Es kann keine Nebeneingang Ansicht von Eingang schaffen“ erhalten laufen mit dem Stack-Trace:

Exception in thread "main" java.lang.IllegalStateException: Unable to create a side-input view from input 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:277) 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:268) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:366) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:214) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:79) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:68) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1738) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1440) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
at co.uk.bubblestudent.dataflow.StarterPipeline.main(StarterPipeline.java:116) 
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. 
at com.google.cloud.dataflow.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:192) 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:275) 
... 20 more 

Mein Code ist:

public class StarterPipeline { 


public static final Duration ONE_DAY = Duration.standardDays(1); 
public static final Duration ONE_HOUR = Duration.standardHours(1); 
public static final Duration TEN_SECONDS = Duration.standardSeconds(10); 
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    private static TableSchema schemaGen() { 
    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("facebookID").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("propertyID").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("time").setType("TIMESTAMP")); 
    TableSchema schema = new TableSchema().setFields(fields); 
    return schema; 
    } 

    public static void main(String[] args) { 
    LOG.info("Starting"); 
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
    LOG.info("Pipeline made"); 
    // For Cloud execution, set the Cloud Platform project, staging location, 
    // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner. 
    options.setProject(<project>); 
    options.setStagingLocation(<bucket>); 
    options.setTempLocation(<bucket>); 
    Pipeline p = Pipeline.create(options); 


    TableSchema schema = schemaGen(); 
    LOG.info("Schema made"); 
    try { 
    LOG.info(schema.toPrettyString()); 
} catch (IOException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
    PCollection<String> input = p.apply(PubsubIO.Read.named("ReadFromPubsub").subscription(<subscription>)); 

    PCollection<TableRow> pardo = input.apply(ParDo.of(new FormatAsTableRowFn())); 
    LOG.info("Formatted Row"); 

    pardo.apply(BigQueryIO.Write.named("Write into BigQuery").to(<table>) 
     .withSchema(schema) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 
    LOG.info("about to run"); 
    p.run(); 

    } 


    static class FormatAsTableRowFn extends DoFn<String, TableRow> { 
    @Override 
    public void processElement(ProcessContext c) { 
     LOG.info("Formatting"); 
     String json = c.element(); 

     //HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 

     // Make a BigQuery row from the JSON object: 
     TableRow row = new TableRow() 
      .set("facebookID","324234") 
      .set("properttyID", "23423") 
      .set("time", "12312313123"); 


     /* 
     *  TableRow row = new TableRow() 
      .set("facebookID", items.get("facbookID")) 
      .set("properttyID", items.get("propertyID")) 
      .set("time", items.get("time")); 
     */ 
     c.output(row); 
    } 
    } 
} 

Beliebig Vorschläge, was das sein könnte?

+0

Welche Version des Dataflow-SDK verwenden Sie? – danielm

+0

1.1.2 für die Sonnenfinsternis –

+0

Ich glaube nicht, dass es eine Version 1.1.2 gab. Datenfluss ist jetzt bis zu 1.6.0; kannst du das versuchen? – danielm

Antwort

1

Die Standardimplementierung von BigQueryIO funktioniert nur über begrenzte PCollections und PubsubIO.Read erzeugt eine unbegrenzte PCollection.

Es gibt zwei Möglichkeiten, dieses Problem zu beheben: Sie können verpflichtet, den Eingang von maxReadTime oder maxNumElements auf Ihrem PubsubIO Aufruf verwandeln, oder Sie können die Streaming-Einsatzart von BigQueryIO verwenden, indem setStreaming (true) auf Optionen aufrufen.

+0

Ah okay, danke für die Antwort! Ich werde das morgen früh im Büro überprüfen. –

+0

Das funktionierte und mein Projekt begann zu laufen, aber es begann zu starten '04. August 2016 9:47:55 Uhr com.google.api.client.http.HttpRequest ausführen WARNUNG: Ausnahme während der Ausführung der Anfrage java.net geworfen .SocketTimeoutException: Gelesene Zeit alle 20 Sekunden oder so. Ich habe das nachgeschlagen und es sieht so aus, als müssten Sie die Socket-Timeout-Zeit bearbeiten? Einige einfache Beispiele tun dies jedoch nicht und ich gehe davon aus, dass sie funktionieren müssen - https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete /TrafficRoutes.java –

+0

Kannst du etwas mehr darüber erzählen, woher diese Ausnahme kommt? Im Allgemeinen sollten Sie sich nicht mit Timeouts herumschlagen müssen. Wenn Sie jedoch lokal arbeiten, verwenden Sie möglicherweise ein Netzwerk mit geringerer oder höherer Latenz. – danielm

Verwandte Themen