Ich bin neu bei Apache Beam und ich versuche, eine Verbindung zu Google Cloud-Instanz von MySQL-Datenbank. Wenn ich das folgende Code-Snippet ausführe, wird die folgende Ausnahme ausgelöst.Apache Beam wirft kann nicht setCoder (null): Java
Logger logger = LoggerFactory.getLogger(GoogleSQLPipeline.class);
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline dataflowPipeline = Pipeline.create();
dataflowPipeline.apply(JdbcIO.<KV<Integer, String>>read().withCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip from google instance>:3306/foodmart")
.withUsername("root").withPassword("root"))
.withQuery("select accouont_id,account_description from account")
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
@Override
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
// TODO Auto-generated method stub
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
}));
dataflowPipeline.run();
Exception in thread "main" java.lang.IllegalArgumentException: Can not setCoder (null) bei org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument (Voraussetzungen. java: 122) bei org.apache.beam.sdk.values.PCollection.setCoder (PCollection.java:265) bei org.apache.beam.sdk.io.jdbc.JdbcIO $ Read.expand (JdbcIO.java: 325) bei org.apache.beam.sdk.io.jdbc.JdbcIO $ Read.expand (JdbcIO.java:272) bei org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:482) um org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:422) bei org.apache.beam.s dk.values.PBegin.apply (PBegin.java:44) bei org.apache.beam.sdk.Pipeline.apply (Pipeline.java:164) bei com.neudesic.com.GoogleSQLPipeline.main (GoogleSQLPipeline.java: 24)