0

Ich schreibe einen Cloud-Datenfluss, der Nachrichten aus Pubsub liest und diese in BigQuery speichert. Ich möchte die partitionierte Tabelle (nach Datum) verwenden und verwende Timestamp, die mit der Nachricht verknüpft ist, um festzustellen, auf welche Partition die Nachricht gehen soll. Unten ist mein Code:Apache Beam: Programmatisch partitionierte Tabellen erstellen

 BigQueryIO.writeTableRows() 
     .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() { 
      private static final long serialVersionUID = 1L; 

      @Override 
       public TableDestination apply(ValueInSingleWindow<TableRow> value) { 
       log.info("Row value : {}", value.getValue()); 
       Instant timestamp = value.getTimestamp(); 
       String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp); 
       TableDestination td = new TableDestination(
        "<project>:<dataset>.<table>" + "$" + partition, null); 
       log.info("Table Destination : {}", td); 
       return td; 
       } 
      })    
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)   
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
    .withSchema(tableSchema); 

Wenn ich den Datenfluß bereitstellen, ich die Log-Anweisungen in Stackdriver sehen kann, aber die Nachrichten werden nicht in BigQuery Tabellen eingefügt und ich erhalte die folgenden Fehler:

Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables 
severity: "WARNING" 

Es sieht also so aus, als ob es nicht in der Lage ist, eine Tabelle zu erstellen, was zu einem Fehler beim Einfügen führt. Muss ich die Datenflussdefinition ändern, damit dies funktioniert? Wenn nicht, gibt es eine andere Möglichkeit, die partitionierten Tabellen programmgesteuert zu erstellen?

Ich benutze Apache Beam 2.0.0.

Antwort

1

Dies war a bug in BigQueryIO und es wurde in Beam 2.2 behoben. Sie können eine Snapshot-Version von Beam verwenden oder warten, bis Release 2.2 fertig gestellt ist (der Release-Prozess wird gerade ausgeführt).

+0

danke für das Update. Einige weitere Fragen (Entschuldigung): 1. Gibt es eine andere Möglichkeit, Tabellen in der Datenfluss-Pipeline zu erstellen (als mit Apache Beam) und 2. Wann erwarten wir, dass Beam 2.2 freigegeben wird? –

+0

Sie können Tabellen mithilfe der BigQuery-API direkt https://cloud.google.com/bigquery/docs/reference/libraries erstellen. Beam release ist ein Prozess der Apache-Community, daher kann es keine festen Garantien geben, aber es scheint, dass dies in der nächsten ein oder zwei Wochen der Fall sein wird. Sie können dem Thread folgen https://lists.apache.org/thread.html/[email protected]%3Cdev.beam.apache.org%3E – jkff

+0

Danke für Ihre Eingaben @jkff. –