2017-07-07 1 views
0

Ich habe 5 Nachrichten an Kafka zu topic1 erstellt und diese erfolgreich konsumiert. Wenn ich die 6. Nachricht sendete und versuchte zu konsumieren, erhielt ich alle 6 Nachrichten statt der letzten (6.) Nachricht.Warum liest mein Konsument jedes Mal alle Nachrichten vom Thema, auch wenn auto.offset.reset = am größten ist?

Bitte beachten Sie, dass ich Consumer-Befehlszeile, eher von einem Datenbank-Connector (Access-Modul) ausführen. Und der Stecker hat die Config Eigenschaft auto.offset.reset auf „größten“ (bitte unten alle Konfigurationseigenschaften von log sehen)

Auch finden Sie in der OffsetChecker Ausgabe unter:.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ 
    --group testjob --zookeeper localhost:2181 --topic topic1 

[2017-07-06 21:57:46,707] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) 
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/testjob/offsets/topic1/0. 

Könnte jemand Bitte lassen Sie mich wissen, wo das Problem liegt?

Hier ist das Protokoll, das Config-Eigenschaften zeigt:

***Global config Properties*** 
*    client.id = rdkafka 
*    message.max.bytes = 1200 
*    receive.message.max.bytes = 100000000 
*    metadata.request.timeout.ms = 60000 
*    topic.metadata.refresh.interval.ms = 600000 
*    topic.metadata.refresh.fast.cnt = 10 
*    topic.metadata.refresh.fast.interval.ms = 250 
*    topic.metadata.refresh.sparse = false 
*    socket.timeout.ms = 60000 
*    socket.send.buffer.bytes = 0 
*    socket.receive.buffer.bytes = 0 
*    socket.keepalive.enable = false 
*    socket.max.fails = 3 
*    broker.address.ttl = 300000 
*    broker.address.family = any 
*    statistics.interval.ms = 0 
*    log_cb = 0x7fecb80c6dd0 
*    log_level = 6 
*    socket_cb = 0x7fecb80cd2f0 
*    open_cb = 0x7fecb80ddd30 
*    opaque = 0x2641280 
*    internal.termination.signal = 0 
*    queued.min.messages = 100000 
*    queued.max.messages.kbytes = 1000000 
*    fetch.wait.max.ms = 100 
*    fetch.message.max.bytes = 1049776 
*    fetch.min.bytes = 1 
*    fetch.error.backoff.ms = 500 
*    group.id = testjob 
*    queue.buffering.max.messages = 100000 
*    queue.buffering.max.ms = 1000 
*    message.send.max.retries = 2 
*    retry.backoff.ms = 100 
*    compression.codec = none 
*    batch.num.messages = 1000 
*    delivery.report.only.error = false 
*    request.required.acks = 1 
*    enforce.isr.cnt = 0 
*    request.timeout.ms = 5000 
*    message.timeout.ms = 300000 
*    produce.offset.report = false 
*    auto.commit.enable = true 
*    auto.commit.interval.ms = 60000 
*    auto.offset.reset = largest <<<<-------- 
*    offset.store.path = . 
*    offset.store.sync.interval.ms = 0 
*    offset.store.method = file 
*    consume.callback.max.messages = 0 
+2

funktionieren wird, wie Sie die Verbraucher ausgeführt haben? Eine vollständige Befehlszeile könnte helfen zu diagnostizieren, was falsch läuft. – amethystic

+0

Ohne Ihren Kunden zu kennen, kann man nur raten. Vielleicht läuft Ihr Kunde weniger als 60s (auto.commit.interval) und wurde getötet, anstatt ordnungsgemäß herunterzufahren. Zu den fehlenden Nodes auf zoekeeper: Es kann sein, dass Sie einen "neuen Consumer" betreiben, der die Offsets nicht an ZK übermittelt. Oder du schreibst nicht in den Wurzelpfad von ZK (was ich empfehlen würde). Überprüfen Sie Ihre Broker-Konfiguration (zookee.connect). Dies könnte folgendermaßen aussehen: 'localhost: 2181/kafka' - in diesem Fall müssen Sie den Pfad zu Ihrer ZK-Verbindungszeichenfolge hinzufügen, wenn Sie den Offset-Checker ausführen. – TobiSH

Antwort

0

Fügen Sie diese Eigenschaften AUTO_OFFSET_RESET_CONFIG = „früheste“ es

Verwandte Themen