2017-10-28 1 views
1

Ich möchte den Zeitstempel der Nachrichten extrahieren, die von FlinkKafkaConsumer010 als Werte im Datenstrom erzeugt werden.Flink + Kafka 0.10: Wie erstelle ich eine Tabelle mit dem Zeitstempel der Kafka-Nachricht als Feld?

Ich kenne die AssignerWithPeriodicWatermarks Klasse, aber dies scheint nur den Zeitstempel für die Zwecke der Zeit Aggregate über die DataStream API zu extrahieren.

Ich möchte, dass Kafka Nachricht Timestamp in einer Table so später zur Verfügung stellen, kann ich SQL darauf verwenden.

EDIT: dies versucht:

val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties) 
    consumer.setStartFromEarliest() 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val tenv = TableEnvironment.getTableEnvironment(env) 

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] { 
    var maxTs = 0L 
    override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = { 
     maxTs = Math.max(maxTs, previousElementTimestamp) 
     previousElementTimestamp 
    } 
    override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L) 
    } 

    val stream = env 
    .addSource(consumer) 
    .assignTimestampsAndWatermarks(new KafkaAssigner[String]) 
    .flatMap(_.split("\\W+")) 

    val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime) 

Es kompiliert, sondern wirft:

Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested. 
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630) 
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) 
    at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624) 
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) 
    at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85) 

in der allerletzten Zeile des obigen Codes.

EDIT2: Danke an @ fabian-hueske für mich auf einen Workaround. Vollständiger Code bei https://github.com/andrey-savov/flink-kafka

Antwort

0

Flink Kafka 0,10 Verbraucher stellen automatisch den Zeitstempel eines Kafkas Nachricht als den Ereigniszeit-Zeitstempel der erzeugten Aufzeichnung, wenn die Zeitcharakteristik Eventtime konfiguriert ist (siehe docs).

Nachdem Sie das Kafka Thema in ein DataStream mit Zeitstempel aufgenommen haben (noch nicht sichtbar) und Wasserzeichen zugewiesen, können Sie es in ein Table mit der StreamTableEnvironment.fromDataStream(stream, fieldExpr*) Methode konvertieren. Der Parameter fieldExpr* ist eine Liste von Ausdrücken, die das Schema der generierten Tabelle beschreiben. Sie können ein Feld hinzufügen, das den Datensatz-Zeitstempel des Streams mit einem Ausdruck mytime.rowtime enthält, wobei mytime der Name des neuen Felds ist und rowtime angibt, dass der Wert aus dem Datensatz-Zeitstempel extrahiert wird. Bitte überprüfen Sie die docs for details.

HINWEIS: Wie @bfair wies darauf hin, die Umwandlung eines DataStream eines atomaren Typs (wie DataStream[String]) schlägt mit einer Ausnahme in Flink 1.3.2 und frühere Versionen. Der Fehler wurde als FLINK-7939 gemeldet und wird in den nächsten Versionen behoben.

+0

Hallo @fabian, danke für die Antwort. Ich folgte dem Scala-Beispiel von der Verknüpfung, die Sie hatten, aber mit einer Laufzeitausnahme (Flink 1.3.2), wenn ich versuche, die ".rowtime" als "fieldExpr" zu registrieren. Die Frage wurde mit einem Beispiel aktualisiert. – bfair

+0

Hallo, ich habe mir den Code angeschaut und einen Fehler gefunden, der sich auf den Typ des DataStreams bezieht. DataStreams mit atomaren Typen (Einzelwerte wie String in Ihrem Fall) werden intern mit einem separaten Codepfad behandelt. Wenn Sie eine MapFunction hinzufügen, um die Zeichenfolge in ein Tupel ('Tuple1 [String]') zu umbrechen, sollte es wie erwartet funktionieren. –

+0

Es wäre toll, hier einen Verweis auf den Fehler zu veröffentlichen, damit andere ihn verfolgen können, wenn er gelöst wird. Danke für Ihre Hilfe. – bfair

Verwandte Themen