Ich möchte den Zeitstempel der Nachrichten extrahieren, die von FlinkKafkaConsumer010
als Werte im Datenstrom erzeugt werden.Flink + Kafka 0.10: Wie erstelle ich eine Tabelle mit dem Zeitstempel der Kafka-Nachricht als Feld?
Ich kenne die AssignerWithPeriodicWatermarks
Klasse, aber dies scheint nur den Zeitstempel für die Zwecke der Zeit Aggregate über die DataStream
API zu extrahieren.
Ich möchte, dass Kafka Nachricht Timestamp in einer Table
so später zur Verfügung stellen, kann ich SQL darauf verwenden.
EDIT: dies versucht:
val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties)
consumer.setStartFromEarliest()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] {
var maxTs = 0L
override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
maxTs = Math.max(maxTs, previousElementTimestamp)
previousElementTimestamp
}
override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L)
}
val stream = env
.addSource(consumer)
.assignTimestampsAndWatermarks(new KafkaAssigner[String])
.flatMap(_.split("\\W+"))
val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime)
Es kompiliert, sondern wirft:
Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
in der allerletzten Zeile des obigen Codes.
EDIT2: Danke an @ fabian-hueske für mich auf einen Workaround. Vollständiger Code bei https://github.com/andrey-savov/flink-kafka
Hallo @fabian, danke für die Antwort. Ich folgte dem Scala-Beispiel von der Verknüpfung, die Sie hatten, aber mit einer Laufzeitausnahme (Flink 1.3.2), wenn ich versuche, die ".rowtime" als "fieldExpr" zu registrieren. Die Frage wurde mit einem Beispiel aktualisiert. – bfair
Hallo, ich habe mir den Code angeschaut und einen Fehler gefunden, der sich auf den Typ des DataStreams bezieht. DataStreams mit atomaren Typen (Einzelwerte wie String in Ihrem Fall) werden intern mit einem separaten Codepfad behandelt. Wenn Sie eine MapFunction hinzufügen, um die Zeichenfolge in ein Tupel ('Tuple1 [String]') zu umbrechen, sollte es wie erwartet funktionieren. –
Es wäre toll, hier einen Verweis auf den Fehler zu veröffentlichen, damit andere ihn verfolgen können, wenn er gelöst wird. Danke für Ihre Hilfe. – bfair