2017-12-22 14 views
0

Ich befolge die Dokumentation zu configure a TableSource with a rowtime attribute.Flink 1.4 Spalte 'rowtime' in keiner Tabelle gefunden

Ich registrieren timestamp Feld als

KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic 
      .forTopic("alerting") 
      // set Kafka consumer properties 
      .withKafkaProperties(getKafkaProperties()) 
      // set Table schema 
      .withSchema(TableSchema.builder() 
        .field("tenant", Types.STRING()) 
        .field("message", Types.STRING()) 
        .field("frequency", Types.LONG()) 
        .field("timestamp", Types.SQL_TIMESTAMP()).build()) 
      .failOnMissingField(true) 
      .withRowtimeAttribute(
        // "timestamp" is rowtime attribute 
        "timestamp", 
        // value of "timestamp" is extracted from existing field with same name 
        new ExistingField("timestamp"), 
        // values of "timestamp" are at most out-of-order by 30 seconds 
        new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1))) 
      .build(); 

    //register the alerting topic as kafka 
    tEnv.registerTableSource("kafka", source); 

    Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " + 
      "FROM kafka " + 
      "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message"); 

    tEnv.toAppendStream(results, Row.class).print(); 

und die folgende Fehler folgt:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)

Antwort

0

Das Feld in der Tabelle kafka ist timestamp und nicht rowtime genannt. Sie sollten also das Attribut mit dem Namen timestamp statt rowtime aufrufen.

Beachten Sie, dass TIMESTAMP ein Schlüsselwort in SQL ist, so sollten Sie entweder das timestamp Attribut umbenennen oder den Attributnamen mit Backticks entkommen (`):

tEnv.sqlQuery(
    "SELECT tenant, message, SUM(frequency) " + 
    "FROM kafka " + 
    "GROUP BY HOP(`timestamp`, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message"); 

Btw. BoundedOutOfOrderTimestamps von einem Tag ist eigentlich ziemlich viel. Dies kann zu erheblichen Verarbeitungswartezeiten und Zustandsgrößen führen, da die Abfrage Daten für einen Tag sammelt, bevor sie beginnt, Ergebnisse zu emittieren und den Status zu verwerfen.

+0

Ich bekomme jetzt eine "Ausnahme im Thread" main "org.apache.flink.runtime.client.JobSubmissionException: Konnte die BlobServer Adresse nicht abrufen." nach dem Ändern des Zeitstempels auf aktualisiert? – Xuan

+0

"SELECT Mieter, Nachricht, SUM (Frequenz)" + "VON Kafka" + "GROUP BY HOP (aktualisiert, INTERVAL '1' zweite, INTERVAL '5' zweite), Mieter, Nachricht" Ausnahme im Thread " main "org.apache.flink.runtime.client.JobSubmissionException: Die BlobServer-Adresse konnte nicht abgerufen werden. \t bei org.apache.flink.runtime.client.JobSubmissionClientActor $ 1.call (JobSubmissionClientActor.java:166) \t bei akka.dispatch.Futures $$ anonfun $ Zukunft $ 1.Apply (Future.scala: 97) \t bei scala.concurrent.impl.Future $ PromiseCompletingRunnable.liftedTree1 $ 1 (Future.scala: 24) – Xuan

+0

Wenn es ein Beispiel/github von Kafka ## JsonTableSource + SQL + windowing gibt, dem ich einfach folgen kann, wäre das fantastisch! – Xuan

Verwandte Themen