Ich habe eine google-cloud-dataflow
Prozess läuft auf der App-engine
. Es hört Nachrichten ab, die über pubsub
gesendet werden, und streamt an big-query
.Wie zu aktualisieren Google-Cloud-Datenfluss in App-Engine ausgeführt, ohne bigquery Tabellen zu löschen
Ich habe meinen Code aktualisiert und ich versuche, die App erneut auszuführen.
Aber ich erhalte diesen Fehler:
Exception in thread "main" java.lang.IllegalArgumentException: BigQuery table is not empty
Gibt es trotzdem Datenfluss zu aktualisieren, ohne die Tabelle zu löschen? Da sich mein Code sehr oft ändern kann und ich Daten in der Tabelle nicht löschen möchte.
Hier ist mein Code:
public class MyPipline {
private static final Logger LOG = LoggerFactory.getLogger(BotPipline.class);
private static String name;
public static void main(String[] args) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("a").setType("string"));
fields.add(new TableFieldSchema().setName("b").setType("string"));
fields.add(new TableFieldSchema().setName("c").setType("string"));
TableSchema tableSchema = new TableSchema().setFields(fields);
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("my-data-analysis");
options.setStagingLocation("gs://my-bucket/dataflow-jars");
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline
.apply(PubsubIO.Read.subscription(
"projects/my-data-analysis/subscriptions/myDataflowSub"));
input.apply(ParDo.of(new DoFn<String, Void>() {
@Override
public void processElement(DoFn<String, Void>.ProcessContext c) throws Exception {
LOG.info("json" + c.element());
}
}));
String fileName = UUID.randomUUID().toString().replaceAll("-", "");
input.apply(ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
JSONObject firstJSONObject = new JSONObject(c.element());
firstJSONObject.put("a", firstJSONObject.get("a").toString()+ "1000");
c.output(firstJSONObject.toString());
}
}).named("update json")).apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
JSONObject json = new JSONObject(c.element());
TableRow row = new TableRow().set("a", json.get("a")).set("b", json.get("b")).set("c", json.get("c"));
c.output(row);
}
}).named("convert json to table row"))
.apply(BigQueryIO.Write.to("my-data-analysis:mydataset.mytable").withSchema(tableSchema)
);
pipeline.run();
}
}