2017-11-03 5 views
0

Ich implementiere einen Mikro-Dienst, der Nachrichten aus der Kafka-Warteschlange liest und in eine Datenbank schreibt. Ich benutze spring-boot 1.5.6.RELEASE und spring-kafka 1.3.0.RELEASE. Um Datenverlust zu vermeiden, musste ich sicherstellen, dass die Nachrichten in der Datenbank gespeichert wurden, bevor Sie den Offset festsetzten, also setze ich die enable.auto.commit auf false und die AckMode auf MANUAL_IMMEDIATE. Hier ist meine Kafka-Konfiguration:Offset wird ausgeführt, obwohl acknowledge() nie aufgerufen wird

@Configuration 
@EnableKafka 
public class KafkaConfiguration { 

    ... 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
    return new HashMap<String, Object>() { 
     { 
     put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
     put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig); 
     put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
     } 
    }; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
     new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); 

    return factory; 
    } 

    ... 
} 

Für die Umsetzung des Zuhörers Ich bin mit der @KafkaListener Anmerkung. Nachdem die Nachricht in der Datenbank gespeichert wurde, verwende ich die Methode acknowledge(), um den Offset zu bestätigen. Hier ist, wie meine Zuhörer aussehen:

@KafkaListener(topics = "${kafka.myTopic}") 
    public void receive(ConsumerRecord<String, String> payload, Acknowledgment acknowledgment) { 
    // persist message here 

    acknowledgment.acknowledge(); 

    latch.countDown(); 
    } 

Zu meiner Anwendung testen ich die Datenbank gestoppt, so dass, wenn die Business-Logik versucht, die Nachricht zu beharren, eine Laufzeitausnahme vor dem Offset geworfen werden würde, wird durch die acknowledge() Verfahren begangen werden:

1) Gestoppt die Datenbank.

2) Senden Sie eine Nachricht mit Inhalt MESSAGE_1.

3) Starten Sie die Datenbank.

4) Senden Sie eine weitere Nachricht mit Inhalt MESSAGE_2. Das Endergebnis war, dass die Datenbank nur MESSAGE_2 enthielt, sodass die erste Nachricht verloren ging. Der einzige Weg, wie ich beide Nachrichten in der Datenbank bekommen konnte, war, als ich den Mikro-Dienst nach dem Start der Datenbank neu startete:

1) Gestoppt die Datenbank.

2) Senden Sie eine Nachricht mit Inhalt MESSAGE_1.

3) Starten Sie die Datenbank.

4) Starten Sie den Micro-Service neu.

5) Senden Sie eine weitere Nachricht mit Inhalt MESSAGE_2.

Diesmal waren beide Nachrichten in der Datenbank. Meine Frage ist, warum im ersten Szenario der Offset Commit-Ereignis war, obwohl eine Laufzeit Ausnahme geworfen wurde und acknowledge() wurde nie aufgerufen? Und wie kann ich meinen Kafka-Listener richtig implementieren, damit bei der Verarbeitung der empfangenen Nachricht keine Daten verloren gehen?

Vielen Dank im Voraus!

Antwort

2

Sie müssen studieren, wie Apache Kafka funktioniert.

Der Commit-Offset ist genau für neue Verbraucher in der gleichen Gruppe oder für den gleichen Neustart. Für den aktuell ausgeführten Consumer ist dies nicht sinnvoll, und Broker verfolgt den aktuellen Offset im Speicher, so dass all dies unabhängig vom Datensatzabrufprozess erfolgt.

Sie müssen seek Verbraucher zurück auf die Position, die Sie interessiert sind zu beachten: https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#seek

Siehe auch diese GH Ausgabe: https://github.com/spring-projects/spring-kafka/issues/470

+0

ich verwendet, um die 'seek' Methode und es hat funktioniert, danke. –

Verwandte Themen