2017-07-08 2 views

Antwort

0

Wenn Sie eine Nachricht erhalten, sollte sie das Thema, die Partition und den Offset enthalten, von wo sie gekommen ist (zusätzlich zu Schlüssel und Wert).

Vom example here:

consumer.OnMessage += (_, msg) 
    => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " + 
     $"Offset: {msg.Offset} {msg.Value}"); 

Sie auch ein Ereignis erhalten, wenn es das Ende der Partition jedes Thema erreicht

consumer.OnPartitionEOF += (_, end) 
    => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" + 
      $" , next message will be at offset {end.Offset}"); 
1

Zusätzlich zur vorherigen Antwort, können Sie

List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions) 

Es wird den letzten Offset von librdkafka für bestimmte Thema/Partitionen abgerufen

Sie haben eine ähnliche Committed Methode, für die begangen neueste von Verbraucher-Offset


Auch können Sie die neuesten bekannten Offsets

WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) 

er eine Anfrage an kafka Cluster Abfrage sendet. Der Anruf blockiert, legen Sie eine angemessene Zeitüberschreitung fest. Derzeit können Sie eine Anfrage nicht gleichzeitig auf mehreren Partitionen senden. Sie können es entweder zum letzten bekannten Versatz zu erhalten, entweder berechnen Verzögerung

Es gibt auch

WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) 

, die den internen Zustand in librdkafka abfragt und konnte zurückkehren INVALID_OFFSET (-1001). Sie können damit Verzögerungen aufgrund der Verarbeitung der Daten feststellen. (Differenz zwischen Position und Ergebnis dieses Verfahrens)

0

Statt Offset-Informationen von Verbrauchern Abrufen (Ich möchte nicht Nachricht konsumieren ersten) konnte ich Thema Offsets (high und low) vom Produzenten wie folgt lesen:

var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10)); 
Verwandte Themen