Ich implementiere einen Mikro-Dienst, der Nachrichten aus der Kafka-Warteschlange liest und in eine Datenbank schreibt. Ich benutze spring-boot 1.5.6.RELEASE
und spring-kafka 1.3.0.RELEASE
. Um Datenverlust zu vermeiden, musste ich sicherstellen, dass die Nachrichten in der Datenbank gespeichert wurden, bevor Sie den Offset festsetzten, also setze ich die enable.auto.commit
auf false und die AckMode
auf MANUAL_IMMEDIATE. Hier ist meine Kafka-Konfiguration:Offset wird ausgeführt, obwohl acknowledge() nie aufgerufen wird
@Configuration
@EnableKafka
public class KafkaConfiguration {
...
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {
{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}
};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
...
}
Für die Umsetzung des Zuhörers Ich bin mit der @KafkaListener
Anmerkung. Nachdem die Nachricht in der Datenbank gespeichert wurde, verwende ich die Methode acknowledge()
, um den Offset zu bestätigen. Hier ist, wie meine Zuhörer aussehen:
@KafkaListener(topics = "${kafka.myTopic}")
public void receive(ConsumerRecord<String, String> payload, Acknowledgment acknowledgment) {
// persist message here
acknowledgment.acknowledge();
latch.countDown();
}
Zu meiner Anwendung testen ich die Datenbank gestoppt, so dass, wenn die Business-Logik versucht, die Nachricht zu beharren, eine Laufzeitausnahme vor dem Offset geworfen werden würde, wird durch die acknowledge()
Verfahren begangen werden:
1) Gestoppt die Datenbank.
2) Senden Sie eine Nachricht mit Inhalt MESSAGE_1.
3) Starten Sie die Datenbank.
4) Senden Sie eine weitere Nachricht mit Inhalt MESSAGE_2. Das Endergebnis war, dass die Datenbank nur MESSAGE_2 enthielt, sodass die erste Nachricht verloren ging. Der einzige Weg, wie ich beide Nachrichten in der Datenbank bekommen konnte, war, als ich den Mikro-Dienst nach dem Start der Datenbank neu startete:
1) Gestoppt die Datenbank.
2) Senden Sie eine Nachricht mit Inhalt MESSAGE_1.
3) Starten Sie die Datenbank.
4) Starten Sie den Micro-Service neu.
5) Senden Sie eine weitere Nachricht mit Inhalt MESSAGE_2.
Diesmal waren beide Nachrichten in der Datenbank. Meine Frage ist, warum im ersten Szenario der Offset Commit-Ereignis war, obwohl eine Laufzeit Ausnahme geworfen wurde und acknowledge()
wurde nie aufgerufen? Und wie kann ich meinen Kafka-Listener richtig implementieren, damit bei der Verarbeitung der empfangenen Nachricht keine Daten verloren gehen?
Vielen Dank im Voraus!
ich verwendet, um die 'seek' Methode und es hat funktioniert, danke. –