2016-11-03 3 views
0

Ich habe eine google-cloud-dataflow Prozess läuft auf der App-engine. Es hört Nachrichten ab, die über pubsub gesendet werden, und streamt an big-query.Wie zu aktualisieren Google-Cloud-Datenfluss in App-Engine ausgeführt, ohne bigquery Tabellen zu löschen

Ich habe meinen Code aktualisiert und ich versuche, die App erneut auszuführen.

Aber ich erhalte diesen Fehler:

Exception in thread "main" java.lang.IllegalArgumentException: BigQuery table is not empty 

Gibt es trotzdem Datenfluss zu aktualisieren, ohne die Tabelle zu löschen? Da sich mein Code sehr oft ändern kann und ich Daten in der Tabelle nicht löschen möchte.

Hier ist mein Code:

public class MyPipline { 
    private static final Logger LOG = LoggerFactory.getLogger(BotPipline.class); 
    private static String name; 

    public static void main(String[] args) { 

     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("a").setType("string")); 
     fields.add(new TableFieldSchema().setName("b").setType("string")); 
     fields.add(new TableFieldSchema().setName("c").setType("string")); 
     TableSchema tableSchema = new TableSchema().setFields(fields); 

     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
     options.setRunner(BlockingDataflowPipelineRunner.class); 
     options.setProject("my-data-analysis"); 
     options.setStagingLocation("gs://my-bucket/dataflow-jars"); 
     options.setStreaming(true); 

     Pipeline pipeline = Pipeline.create(options); 

     PCollection<String> input = pipeline 
       .apply(PubsubIO.Read.subscription(
         "projects/my-data-analysis/subscriptions/myDataflowSub")); 

     input.apply(ParDo.of(new DoFn<String, Void>() { 

      @Override 
      public void processElement(DoFn<String, Void>.ProcessContext c) throws Exception { 
       LOG.info("json" + c.element()); 
      } 

     })); 
     String fileName = UUID.randomUUID().toString().replaceAll("-", ""); 


     input.apply(ParDo.of(new DoFn<String, String>() { 
      @Override 
      public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { 
       JSONObject firstJSONObject = new JSONObject(c.element()); 
       firstJSONObject.put("a", firstJSONObject.get("a").toString()+ "1000"); 
       c.output(firstJSONObject.toString()); 

      } 

     }).named("update json")).apply(ParDo.of(new DoFn<String, TableRow>() { 

      @Override 
      public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception { 
       JSONObject json = new JSONObject(c.element()); 
       TableRow row = new TableRow().set("a", json.get("a")).set("b", json.get("b")).set("c", json.get("c")); 
       c.output(row); 
      } 

     }).named("convert json to table row")) 
       .apply(BigQueryIO.Write.to("my-data-analysis:mydataset.mytable").withSchema(tableSchema) 
     ); 

     pipeline.run(); 
    } 
} 

Antwort

2

Sie müssen withWriteDisposition angeben auf BigQueryIO.Write - siehe Dokumentation of the method und of its argument. Je nach Ihren Anforderungen benötigen Sie entweder WRITE_TRUNCATE oder WRITE_APPEND.

Verwandte Themen