2017-07-21 5 views
0

Ich habe eine PCollection [String] sagen "X", die ich in einer BigQuery-Tabelle ausgeben muss. Das Tabellenziel und das Schema dafür ist in einer PCollection [TableRow] sagt "Y". Wie erreicht man das auf einfachste Weise?DymanicDestinations in Apache Beam

Ich habe versucht, die Tabelle und das Schema aus "Y" zu extrahieren und in statischen globalen Variablen (tableName bzw. schema) zu speichern. Aber seltsamerweise erhält BigQueryIO.writeTableRows() immer den Wert der Variablen tabellenname als null. Aber es bekommt das Schema. Ich habe versucht, die Werte dieser Variablen zu protokollieren, und ich kann die Werte für beide sehen.

Hier ist meine Pipeline Code:

static String tableName; 
static TableSchema schema; 

PCollection<String> read = p.apply("Read from input file", 
    TextIO.read().from(options.getInputFile())); 

PCollection<TableRow> tableRows = p.apply(
    BigQueryIO.read().fromQuery(NestedValueProvider.of(
    options.getfilename(), 
    new SerializableFunction<String, String>() { 
     @Override 
     public String apply(String filename) { 
      return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'"; 
     } 
    })).usingStandardSql().withoutValidation()); 

final PCollectionView<List<String>> dataView = read.apply(View.asList()); 

tableRows.apply("Convert data read from file to TableRow", 
    ParDo.of(new DoFn<TableRow,TableRow>(){ 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
     tableName = c.element().get("table").toString(); 
     String[] schemas = c.element().get("schema").toString().split(","); 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     for(int i=0;i<schemas.length;i++) { 
     fields.add(new TableFieldSchema() 
      .setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1])); 
     } 
     schema = new TableSchema().setFields(fields); 

     //My code to convert data to TableRow format. 
    }}).withSideInputs(dataView)); 


tableRows.apply("write to BigQuery", 
    BigQueryIO.writeTableRows() 
    .withSchema(schema) 
    .to("ProjectID:DatasetID."+tableName) 
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

Alles funktioniert gut. Nur der BigQueryIO.write-Vorgang schlägt fehl und ich erhalte den Fehler, dass TableId null ist.

Ich habe auch versucht, SerializableFunction verwenden und den Wert von dort zurück, aber ich bekomme immer noch null. Hier

ist der Code, den ich für es versucht:

tableRows.apply("write to BigQuery", 
BigQueryIO.writeTableRows() 
    .withSchema(schema) 
    .to(new GetTable(tableName)) 
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

public static class GetTable implements SerializableFunction<String,String> { 
    String table; 

    public GetTable() { 
    this.table = tableName; 
    } 

    @Override 
    public String apply(String arg0) { 
    return "ProjectId:DatasetId."+table; 
    } 
} 

Ich habe auch versucht DynamicDestinations mit, aber ich erhalte eine Fehlermeldung, Schema ist nicht vorgesehen. Ehrlich gesagt bin ich neu im Konzept von DynamicDestinations und ich bin mir nicht sicher, ob ich es richtig mache. Hier

ist der Code, den ich für es versucht:

tableRows2.apply(BigQueryIO.writeTableRows() 
    .to(new DynamicDestinations<TableRow, TableRow>() { 
    private static final long serialVersionUID = 1L; 
    @Override 
    public TableDestination getTable(TableRow dest) { 
     List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema 
     String table = list.get(0).get("table").toString(); 
     String tableSpec = "ProjectId:DatasetId."+table; 
     String tableDescription = ""; 
     return new TableDestination(tableSpec, tableDescription); 
    } 

    public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) { 
     return null; 
    } 

    @Override 
    public TableSchema getSchema(TableRow destination) { 
     return schema; //schema is getting added from the global variable 
    } 
    @Override 
    public TableRow getDestination(ValueInSingleWindow<TableRow> element) { 
     return null; 
    } 
}.getSideInputs(bqDataView))); 

Bitte lassen Sie mich wissen, was ich falsch mache und welchen Weg soll ich nehmen.

Vielen Dank.

Antwort

0

Teil des Grundes, warum Sie Probleme haben, ist wegen der zwei Phasen der Pipeline-Ausführung. Zuerst wird die Pipeline auf Ihrer Maschine erstellt. Dies ist, wenn alle Anwendungen von PTransforms auftreten. In Ihrem ersten Beispiel ist dies, wenn die folgenden Zeilen ausgeführt werden:

BigQueryIO.writeTableRows() 
    .withSchema(schema) 
    .to("ProjectID:DatasetID."+tableName) 

Der Code innerhalb eines Pardo jedoch ausgeführt wird, wenn Sie Ihre Pipeline ausgeführt wird, und es tut dies auf vielen Maschinen. So ist der folgende Code ausgeführt viel später als die Pipeline-Bau:

@ProcessElement 
public void processElement(ProcessContext c) { 
    tableName = c.element().get("table").toString(); 
    ... 
    schema = new TableSchema().setFields(fields); 
    ... 
} 

Das bedeutet, dass weder der Tabellenname noch die Schema-Felder werden eingestellt werden, wenn die BigQueryIO Senke erstellt wird.

Ihre Idee, DynamicDestinations zu verwenden, ist richtig, aber Sie müssen den Code verschieben, um das Schema tatsächlich in dieser Klasse zu generieren, anstatt sich auf globale Variablen zu verlassen, die nicht auf allen Maschinen verfügbar sind.

+0

okay ... aber das ist, was ich gesagt habe, dass das Programm seltsamerweise das Schema während bigqueryIO.write Operation erhalten kann, aber es den Tabellenname nicht bekommt ... – rish0097

+0

Sind Sie sicher, dass das Schema richtig eingestellt wurde? Wenn man sich den Code anschaut, sieht es so aus, als wäre das Schema null, und nichts würde wirklich fehlschlagen, bis die Pipeline tatsächlich ausgeführt wird. Wie bereits erwähnt, sollte dieser Pfad fehlschlagen. Wenn er also anders als erwartet ausfällt, ist die Verfolgung des DynamicDestinations-Ansatzes der Weg nach vorne. –

Verwandte Themen