Ich arbeite mit einfachen Kafka Consumer in einem meiner Projekte und meine gewünschte Logik ist, wenn der Verbraucher einige Nachrichten nicht verarbeiten konnte, wird es die letzte korrekt verarbeitete Nachricht übernehmen und dann bei der nächsten Umfrage wird es fortgesetzt von fehlgeschlagener Nachricht.Kafka-Consumer-Wiederherstellung nach fehlgeschlagener Nachrichtenverarbeitung
ich versuchte, jede Nachricht manuell mit folgendem Code zu begehen:
public void fetchMessages() {
ConsumerRecords<String, MyObject> messages = kafkaConsumer.poll(10000);
for (ConsumerRecord message : messages) {
logger.info("Reading kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], offset ["+message.offset()+"]");
try {
MyObject myObject = (MyObject) message.value();
logger.info("Handling message," + myObject);
handleMessage(myObject);
commitMessage(message);
} catch (Exception e) {
logger.error("Error handling message"); throw e;
}
}
}
private void commitMessage(ConsumerRecord message) {
long nextOffset = message.offset() + 1;
TopicPartition topicPartition = new TopicPartition(kafkaTopic,message.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(nextOffset);
Map<TopicPartition,OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
offsetAndMetadataMap.put(topicPartition,offsetAndMetadata);
logger.info("Commiting processed kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], next offset ["+nextOffset+"]");
kafkaConsumer.commitSync(offsetAndMetadataMap);
}
Aber zum Beispiel, wenn I-3-Nachrichten holen, von denen jedem von anderer Partition, I den ersten erfolgreichen Handhabung und dann gescheitert zweite Nachricht zu verarbeiten, Ich verlasse gerade ConsumerRecord
s für Schleife und ich erwarte, um 2 Nachrichten zu erhalten, die ich noch nicht in der nächsten poll
Iteration begangen habe. Stattdessen empfängt der Verbraucher einfach weiterhin neue Nachrichten und kehrt nie zu fehlgeschlagenen Nachrichten zurück.
Auch versucht, seek
auf fehlgeschlagene Nachricht anzuwenden und dann die Schleife zu beenden, aber es funktioniert auf 1 Partition und nicht auf viele arbeiten.
kafkaConsumer.seek(new TopicPartition(kafkaTopic,message.partition()),message.offset());
Einige Details:
- Thema hat 12 Partitionen
- Ein Verbraucher für alle Partitionen
- Verbraucherumfrage Schleife ein in Minute
- enable.auto.commit ausführt: false
Was ist falsch mit meinem Code oder mit meine Logik?
Ich glaube, Sie wollen Sie nicht hinzufügen ‚1‘ auf den Offset Ich engagiere mich. Stattdessen möchten Sie den korrekt verbrauchten Offset festschreiben. Das würde erklären, dass eine der fehlgeschlagenen Nachrichten in Ihrem Beispiel nicht wiedergegeben wird ... – jimijazz
@jimijazz Ich glaube nicht, dass Sie Recht haben. Bitte werfen Sie einen Blick auf die Kafka Consumer API - commitSync-Methode. ** Der festgeschriebene Offset sollte die nächste Nachricht sein, die Ihre Anwendung konsumiert, zB lastProcessedMessageOffset + 1. ** [link] (https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/ Kunden/Verbraucher/KafkaConsumer.html) – mixermt
Ich denke, du hast Recht @mixermt, danke für das darauf hin ... Ich sehe nicht die Logik davon. Würden nachfolgende Aufrufe von 'poll()' den ersten Offset in jedem Stapel vermissen, da er bereits von der vorherigen Iteration als festgeschrieben markiert wurde? – jimijazz