Ich habe eine Apache Beam-Task, die aus einer MySQL-Quelle mit JDBC liest und die Daten so schreiben soll, wie sie in einer BigQuery-Tabelle sind. An diesem Punkt, der später kommt, wird keine Umwandlung durchgeführt, für den Moment möchte ich nur, dass die Datenbankausgabe direkt in BigQuery geschrieben wird.MySQL als Eingabequelle verwenden und in Google BigQuery schreiben
Dies ist die wichtigste Methode versucht, diesen Vorgang auszuführen:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("phone").setType("STRING"));
fields.add(new TableFieldSchema().setName("url").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
p.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name")
.withUsername("user")
.withPassword("pass"))
.withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100")
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getString(2));
}
})
.apply(BigQueryIO.Write
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)));
p.run();
}
Aber wenn ich die Vorlage mit Maven ausführen, ich die folgende Fehlermeldung erhalten:
Test.java:[184,6] cannot find symbol symbol: method apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
location: class org.apache.beam.sdk.io.jdbc.JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>
Es scheint, dass ich bin BigQueryIO wird nicht übergeben.Schreibe die erwartete Datensammlung und das ist es, womit ich gerade zu kämpfen habe.
Wie kann ich sicherstellen, dass die von MySQL stammenden Daten in diesem Fall die Erwartungen von BigQuery erfüllen?