2017-02-21 4 views
0

Ich habe eine Apache Beam-Task, die aus einer MySQL-Quelle mit JDBC liest und die Daten so schreiben soll, wie sie in einer BigQuery-Tabelle sind. An diesem Punkt, der später kommt, wird keine Umwandlung durchgeführt, für den Moment möchte ich nur, dass die Datenbankausgabe direkt in BigQuery geschrieben wird.MySQL als Eingabequelle verwenden und in Google BigQuery schreiben

Dies ist die wichtigste Methode versucht, diesen Vorgang auszuführen:

public static void main(String[] args) { 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 

     Pipeline p = Pipeline.create(options); 

     // Build the table schema for the output table. 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("phone").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("url").setType("STRING")); 
     TableSchema schema = new TableSchema().setFields(fields); 

     p.apply(JdbcIO.<KV<String, String>>read() 
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
      "com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name") 
      .withUsername("user") 
      .withPassword("pass")) 
      .withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100") 
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() { 
       public KV<String, String> mapRow(ResultSet resultSet) throws Exception { 
       return KV.of(resultSet.getString(1), resultSet.getString(2)); 
      } 
      }) 
     .apply(BigQueryIO.Write 
      .to(options.getOutput()) 
      .withSchema(schema) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))); 

     p.run(); 
    } 

Aber wenn ich die Vorlage mit Maven ausführen, ich die folgende Fehlermeldung erhalten:

Test.java:[184,6] cannot find symbol symbol: method apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
location: class org.apache.beam.sdk.io.jdbc.JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>

Es scheint, dass ich bin BigQueryIO wird nicht übergeben.Schreibe die erwartete Datensammlung und das ist es, womit ich gerade zu kämpfen habe.

Wie kann ich sicherstellen, dass die von MySQL stammenden Daten in diesem Fall die Erwartungen von BigQuery erfüllen?

Antwort

1

Ich glaube, dass Sie eine PCollection <TableRow> BigQueryIO.Write anstelle des PCollection < KV < String, String > > Typ zur Verfügung stellen müssen, dass die RowMapper Ausgeben.

Verwenden Sie auch die richtigen Spaltennamen und Wertepaare beim Einstellen von TableRow. Hinweis: Ich denke, dass Ihre KVs die Telefon- und URL-Werte sind (zB {"555-555-1234": "http://www.url.com"}), nicht die Spaltennamen und Wertpaare (zB {"phone": "555-555- 1234" , ‚url‘: „http://www.url.com“})

das Beispiel Siehe hier: https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

bitte Sie diesem einen Versuch geben und lassen Sie mich wissen, ob es für Sie arbeitet? Hoffe das hilft.

Verwandte Themen