Ich habe eine PCollection [String] sagen "X", die ich in einer BigQuery-Tabelle ausgeben muss. Das Tabellenziel und das Schema dafür ist in einer PCollection [TableRow] sagt "Y". Wie erreicht man das auf einfachste Weise?DymanicDestinations in Apache Beam
Ich habe versucht, die Tabelle und das Schema aus "Y" zu extrahieren und in statischen globalen Variablen (tableName bzw. schema) zu speichern. Aber seltsamerweise erhält BigQueryIO.writeTableRows() immer den Wert der Variablen tabellenname als null. Aber es bekommt das Schema. Ich habe versucht, die Werte dieser Variablen zu protokollieren, und ich kann die Werte für beide sehen.
Hier ist meine Pipeline Code:
static String tableName;
static TableSchema schema;
PCollection<String> read = p.apply("Read from input file",
TextIO.read().from(options.getInputFile()));
PCollection<TableRow> tableRows = p.apply(
BigQueryIO.read().fromQuery(NestedValueProvider.of(
options.getfilename(),
new SerializableFunction<String, String>() {
@Override
public String apply(String filename) {
return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'";
}
})).usingStandardSql().withoutValidation());
final PCollectionView<List<String>> dataView = read.apply(View.asList());
tableRows.apply("Convert data read from file to TableRow",
ParDo.of(new DoFn<TableRow,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) {
tableName = c.element().get("table").toString();
String[] schemas = c.element().get("schema").toString().split(",");
List<TableFieldSchema> fields = new ArrayList<>();
for(int i=0;i<schemas.length;i++) {
fields.add(new TableFieldSchema()
.setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
}
schema = new TableSchema().setFields(fields);
//My code to convert data to TableRow format.
}}).withSideInputs(dataView));
tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to("ProjectID:DatasetID."+tableName)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Alles funktioniert gut. Nur der BigQueryIO.write-Vorgang schlägt fehl und ich erhalte den Fehler, dass TableId null ist.
Ich habe auch versucht, SerializableFunction verwenden und den Wert von dort zurück, aber ich bekomme immer noch null. Hier
ist der Code, den ich für es versucht:
tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to(new GetTable(tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
public static class GetTable implements SerializableFunction<String,String> {
String table;
public GetTable() {
this.table = tableName;
}
@Override
public String apply(String arg0) {
return "ProjectId:DatasetId."+table;
}
}
Ich habe auch versucht DynamicDestinations mit, aber ich erhalte eine Fehlermeldung, Schema ist nicht vorgesehen. Ehrlich gesagt bin ich neu im Konzept von DynamicDestinations und ich bin mir nicht sicher, ob ich es richtig mache. Hier
ist der Code, den ich für es versucht:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination getTable(TableRow dest) {
List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema
String table = list.get(0).get("table").toString();
String tableSpec = "ProjectId:DatasetId."+table;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
return null;
}
@Override
public TableSchema getSchema(TableRow destination) {
return schema; //schema is getting added from the global variable
}
@Override
public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
return null;
}
}.getSideInputs(bqDataView)));
Bitte lassen Sie mich wissen, was ich falsch mache und welchen Weg soll ich nehmen.
Vielen Dank.
okay ... aber das ist, was ich gesagt habe, dass das Programm seltsamerweise das Schema während bigqueryIO.write Operation erhalten kann, aber es den Tabellenname nicht bekommt ... – rish0097
Sind Sie sicher, dass das Schema richtig eingestellt wurde? Wenn man sich den Code anschaut, sieht es so aus, als wäre das Schema null, und nichts würde wirklich fehlschlagen, bis die Pipeline tatsächlich ausgeführt wird. Wie bereits erwähnt, sollte dieser Pfad fehlschlagen. Wenn er also anders als erwartet ausfällt, ist die Verfolgung des DynamicDestinations-Ansatzes der Weg nach vorne. –