0

Ich habe ein Thema mit 40 Partitionen. Die Einstellungen sind so:Confluence Kafka: Consumer liest nicht von Anfang an für alle Partitionen in einem Thema

def on_assign (c,ps): 
for p in ps: 
    p.offset=0 
print ps 
c.assign(ps) 

conf = {'bootstrap.servers': 'localhost:9092' 
     'enable.auto.commit' : False, 
     'group.id' : 'confluent_consumer', 
     'default.topic.config': {'auto.offset.reset': 'earliest'} 
     } 
consumer = Consumer(**conf) 
consumer.subscribe(['topic.source'], on_assign=on_assign) 

msg = consumer.poll(timeout=100000) 
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key()) 

I von 0 topic.source für alle Partitionen des Themas Offset lesen möchten. Aber ich sehe es nicht für alle Partitionen. Für einige Partitionen liest es von einem bestimmten Offset, von dem ich annehme, dass es sich um den Committed-Offset handelt, das Ändern der group.id jedes Mal hilft auch nicht. Wie kann ich von Anfang an für alle Partitionen dieses Themas lesen, unabhängig von den festgelegten Offsets?

Ich druckte ps in on_assign() und es gedruckt etwas wie dies für alle 40 Partitionen:

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on 

Antwort

1

Wenn Sie group.id auf einen neuen Wert gesetzt verwenden oder eine Gruppe verwenden, die nicht begangen hat, jede mit auto.offset.reset Satz versetzt earliest dann beginnt der Consumer am Anfang der Partition.

Der Anfang ist möglicherweise nicht Offset 0. Abhängig von den Aufbewahrungseinstellungen Ihres Brokers kann Kafka Nachrichten löschen, daher könnte die erste verfügbare Nachricht in Ihren Partitionen einen beliebigen Offset haben.

+0

Hallo Mickael, danke für deine Antwort. Gibt es einen Befehl/ein Werkzeug, um zu wissen, was der Anfang der Verschiebung einer Themenpartition ist (Falls die vorherigen Nachrichten aufgrund der Aufbewahrungsrichtlinie gelöscht werden)? – NoName

+0

Wenn Sie nach dem Broker suchen möchten, können Sie in kafkas 'log.dirs' gehen und das Verzeichnis für die Partitionen finden. Im Inneren sollte eine '* .log' Datei sein. Der Name der Datei sollte den ersten Offset anzeigen. Wenn Sie beispielsweise '00000000000000000216.log' sehen, ist 216 der erste Offset. Abhängig von Ihren Einstellungen können mehrere Protokolldateien vorhanden sein, nehmen Sie den kleinsten Namen. –

+0

Hallo Mickael! Danke, ich habe die log.dirs gefunden und kann die * .log Dateien sehen – NoName

Verwandte Themen