0

Ich habe zwei High-Level-Fragen in mehr individuelle Fragen aufgeteilt, beide der High-Level-Fragen befassen sich mit einer Verbrauchergruppe eine Apache Kafka Streams API erstellen und verwenden.Kafka Stream und Consumer Group Weird Behavior

Zunächst ist die Ausgabe von kafka-consumer-group.sh Skript. Ich erhalte seltsame Ausgabe, die mir nicht wirklich sagen, wo ein bestimmte Verbraucher an ist, obwohl sie scheinen zu einer bestimmten Gruppe/Thema/Partition angeschlossen werden:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG 
STANDARD_DATA     9   11    11    0   myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer-4fd9dc15-d8a7-4598-85a9-3761ae6a747b/1.1.1.1     myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer 
STANDARD_DATA     0   4    11    7   myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer-28e1c7bf-860d-44d6-bf58-5e0ff875587c/1.1.1.1     myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer 
STANDARD_DATA     4   -    10    -   myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3/1.1.1.1     myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer 
STANDARD_DATA     5   -    10    -   myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer-a81f1399-1fc4-4579-b24f-fa8fee01fabf/1.1.1.1     myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer 
STANDARD_DATA     3   -    12    -   myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer-6a83bfcc-2c6e-4e9d-a819-029ac8c6ae17/1.1.1.1     myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer 
STANDARD_DATA     8   12    12    0   myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer-6d46bed3-70c4-4c7f-8e53-f9591192bc3f/1.1.1.1     myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer 
STANDARD_DATA     7   -    11    -   myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer-5313315b-ded9-4fe7-ac9d-d8d5b20dd5b9/1.1.1.1     myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer 
STANDARD_DATA     2   10    10    0   myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer-c08a648f-548e-47a8-8bc5-7b6fa3bc1fb5/1.1.1.1     myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer 
STANDARD_DATA     1   2    10    8   myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer-08d99679-d430-4e9f-a3b9-11e558ca34a4/1.1.1.1     myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer 
STANDARD_DATA     6   -    12    -   myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer-666040f8-d4d0-49e9-9db6-c6efee49ebe1/1.1.1.1     myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer 
  1. Warum ist es, dass einige CURRENT-OFFSETS (3. Spalte) und LAG (4. Spalte) erscheinen als "-", wenn ich die API von Kafka direkt abfragen kann, um zu unterscheiden, dass sie tatsächlich eingeholt sind?

(abgefragt durch golang API)

4      myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3 OFFSET: 10  LOG-END: 10    LAG: 0 
  1. Auch, warum auf würde, die (auch bekannt als in den Protokollen wie dargestellt Offset nicht zu sein zeigt sich, sollte es sein eingeholt) im Allgemeinen?

Meine zweite hohe Niveau Frage ist, dass der Stromes. Wir haben einen Stream-Prozess, der zu zufälligen Zeiten (meistens während eines Neustarts) auf den frühesten Offset zurückkehrt, der in bestimmten Themen verfügbar ist. Während des gesamten Codes gibt es kein "Reset" und der OFFSET_RESET wird nicht berührt. Ich kann auch bestätigen, dass wir das "genau einmal" nicht verwenden, also bin ich mir nicht sicher, wo genau diese Offset-Resets ins Spiel kommen.

Wieder einmal seine im Grunde:

Stream Prozess wird durch die Daten am laufenden Band, etwas ~ ~ passiert und dann in unseren Offsets sind wieder Boden 0, wieder Verarbeitung. Dies kann einige Tage bis Wochen dauern, bevor es sich dazu entscheidet, ebenfalls zurückzusetzen, so dass die Versetzungen stattfinden.

Antwort

2

Über die Ausgabe von kafka-consumer-groups.sh: A - in CURRENT-OFFSET gibt an, dass es für diese Partition keinen festgeschriebenen Offset gibt. Dies bedeutet, dass die Verzögerung auch nicht berechnet werden kann (also erhalten Sie auch dort eine -).

Wenn ich Ihre Aussage richtig gelesen, wenn Sie die Offsets mit golang abfragen, es zeigt, dass Trennwand 4 auf 10 versetzt, im Gegensatz zu dem, was kafka-consumer-groups.sh zeigt und indem diejenigen - nicht sicher, warum dies der Fall ist ...

Informationen zu den zurückgesetzten Offsets: Möglicherweise müssen Sie die Broker-Konfiguration offsets.retention.minutes erhöhen - Standard ist 24h (vgl. https://docs.confluent.io/current/streams/faq.html#why-is-my-application-re-processing-data-from-the-beginning).

Beachten Sie auch, dass die Streams-API die Standard-Rücksetzrichtlinie "früheste" verwendet (im Gegensatz zur Consumer-API, die "zuletzt" als Standard verwendet). Sie können die Reset-Richtlinie in Streams API über StreamsConfig ändern: https://docs.confluent.io/current/streams/developer-guide.html#non-streams-configuration-parameters

+0

Dies macht absolut perfekten Sinn. Ich werde diese Einstellungen anpassen und sehen, ob es das Problem löst. Gibt es in der Regel eine Best Practice um die offsets.retention.minutes? Unsere Anwendungen haben eine Leerlaufzeit, so dass ich mit der Korrektur der Reset-Richtlinie nicht sicher bin, ob wir sie wirklich erhöhen müssen. Gedanken? – jbkc85

+0

Entschuldigung - Frage: Wenn ein Verbraucher dort sitzt, hören Sie ein Thema der Verbrauchergruppe und keine Nachrichten kommen bis nach den Offsets.retention.Minuten, wird der Benutzer immer noch zurückgesetzt, sobald eine neue Nachricht eingeht? Zum Beispiel, wenn Offsets bei 55 für 30 Stunden stecken bleiben, und dann die Nummer 56 kommt ... wenn der Verbraucher Teil der Verbrauchergruppe ist, verbindet er sich wieder und verbraucht automatisch bei Offset 57 (weil es zuletzt auf Offset zurückgesetzt wird) – jbkc85

+1

Wenn Sie Offset mit auto.offset.reset = latest verlieren, könnte es theoretisch passieren, dass Sie einige Datensätze überspringen und nicht verarbeiten. Daher wird eine Erhöhung der Offset-Retentionszeit empfohlen, auch wenn Sie "aktuell" verwenden. –