0

Angenommen, wir haben 3 Kafka-Partitionen für ein Thema, und ich möchte, dass meine Ereignisse mit der Ereigniszeit stundenweise gefenstert werden.Wie verwendet Flink Nachrichten von einem Kafka-Thema mit mehreren Partitionen, ohne dass es zu Verzerrungen kommt?

Wird der Kafka-Consumer aufhören, von einer Partition zu lesen, wenn sie sich außerhalb des aktuellen Fensters befindet? Oder öffnet es ein neues Fenster? Wenn es neue Fenster öffnet, wäre es dann theoretisch nicht möglich, eine unbegrenzte Anzahl von Fenstern zu öffnen und somit nicht genug Speicher zu haben, wenn die Ereigniszeit einer Partition im Vergleich zu den anderen sehr verzerrt wäre? Dieses Szenario wäre insbesondere möglich, wenn wir etwas Geschichte wiedergeben.

Ich habe versucht, diese Antwort aus dem Lesen von Dokumentation zu bekommen, kann aber nicht viel über die Interna von Flink mit Kafka auf Partitionen finden. Eine gute Dokumentation zu diesem speziellen Thema wäre sehr willkommen.

Danke!

Antwort

0

Also zunächst alle Veranstaltungen von Kafka werden ständig und die weiteren Windowing-Operationen gelesen haben keinen Einfluss darauf. Es gibt mehr Dinge zu beachten, wenn es darum geht, nicht genügend Arbeitsspeicher zu haben.

  • Sie in der Regel nicht speichern jedes Ereignis für ein Fenster, sondern nur einige Aggregate für das Ereignis
  • , wenn Fenster des entsprechenden Speicher freigegeben werden geschlossen.

Einige mehr darüber, wie Kafka Verbraucher interagiert mit Eventtime (Wasserzeichen, insbesondere können Sie here

+0

Das ist sehr nützlich, danke. Wird die Reduzierung in diesem Beispiel ausgeführt, während das Fenster Ereignisse anhäuft? whateverSource.windowByEventTime(). reduce (someReduceFunc) .toSomeSink (foo) – RoyB

+0

Ja, und nur die Ergebnisse der 'reduce'-Funktion werden gespeichert. –

0

Sie könnten versuchen, diese Art von Stil zu verwenden

public void runStartFromLatestOffsets() throws Exception { 
     // 50 records written to each of 3 partitions before launching a latest-starting consuming job 
     final int parallelism = 3; 
     final int recordsInEachPartition = 50; 

     // each partition will be written an extra 200 records 
     final int extraRecordsInEachPartition = 200; 

     // all already existing data in the topic, before the consuming topology has started, should be ignored 
     final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); 

     // the committed offsets should be ignored 
     KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); 
     kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); 
     kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); 
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); 
0

überprüfen dies Sie Klarheit, dass jeder

bekommen helfen sein kann

https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java

A Partitionierungs Gewährleistung Die interne Flink-Partition landet in einer Kafka-Partition

Gehen Sie zwar vollständig kommentierte Teil.Sie haben erklärt var auch in den Fällen.

+0

Dies ist für die Produktion von Veranstaltungen in Richtung Kafka? Nicht zum Konsumieren? Dies scheint meinem Anwendungsfall keinen Wert zu geben. – RoyB

Verwandte Themen