2017-08-02 3 views
1

Ich verwende Kafka Version0.10.2.1 und Spring Boot für mein Projekt.Kafka Duplikat lesen

Ich habe 5 Partitionen von einem Thema, das von mehrere Verbrauchern konsumiert werden kann (mit der gleicher Gruppe-Id), die auf andere Maschine ausgeführt werden.

welches Problem ich bin vor ist:

Ich Duplikat einer einzelnen Nachricht mit dieser Kafka Warnung logs

Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Als Protokolle lesen immer zeigen, dass dieses Problem entsteht, weil Kafka Verbraucher konnte nicht übergeben werden.

Hier sind einige Details über meine Use-Case:

  • Ich habe mehrere Verbraucher von einem Thema My-Topic, die zu derselben gehört Gruppe-Id my-consumer-group

  • Consumer Nachrichten von Kafka verbraucht , wenden Sie Geschäftslogik an und speichern Sie verarbeitete Daten in Cassandra

  • Das Verfahren für das Verbrauchen der Mitteilung von Kafka Das Anwenden der Geschäftslogik und dann das Speichern in Cassandra dauert ungefähr 10 ms pro Nachricht, die von Kafka verbraucht wird.

ich folgenden Code verwenden Kafka-consumer bean

@Configuration 
@EnableKafka 
public class KafkaConsumer { 
    @Value("${spring.kafka.bootstrap-servers}") 
    private String brokerURL; 

    @Value("${spring.kafka.session.timeout}") 
    private int sessionTimeout; 

    @Value("${spring.kafka.consumer.my-group-id}") 
    private String groupId; 

    @Value("${spring.kafka.listener.concurrency}") 
    private int concurrency; 

    @Value("${spring.kafka.listener.poll-timeout}") 
    private int timeout; 

    @Value("${spring.kafka.consumer.enable-auto-commit}") 
    private boolean autoCommit; 

    @Value("${spring.kafka.consumer.auto-commit-interval}") 
    private String autoCommitInterval; 

    @Value("${spring.kafka.consumer.auto-offset-reset}") 
    private String autoOffsetReset; 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(concurrency); 
     factory.getContainerProperties().setPollTimeout(timeout); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); 
     propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
     propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); 
     return propsMap; 
    } 
} 

Dies sind die kafka-Konfiguration I

bin mit
spring.kafka.listener.concurrency=2 
spring.kafka.listener.poll-timeout=3000 
spring.kafka.consumer.auto-commit-interval=1000 
spring.kafka.consumer.enable-auto-commit=true 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.session.timeout=50000 
spring.kafka.connection.timeout=10000 
spring.kafka.topic.partition=5 
spring.kafka.message.replication=2 

Mein Hauptanliegen doppelter Lese ist von einem erstellen Wenn mehrere Kafka-Konsumenten zu derselben Verbrauchergruppe und in meiner Anwendung gehören, muss ich doppelte Einträge in die Datenbank vermeiden.

Könnten Sie mir bitte helfen, meine obigen Kafka-Konfigurationen und den Kafka-Consumer-Code zu überprüfen, damit ich das doppelte Lesen vermeiden kann?

Antwort

0

Die einfache Antwort ist, verwenden Sie nicht autoCommit - es verpflichtet sich nach einem Zeitplan.

Lassen Sie den Container stattdessen die Commits ausführen; mit AckModeRECORD.

Allerdings sollten Sie Ihren Code immer noch Idempotent machen - es gibt immer eine Möglichkeit der Neulieferung; Es ist nur so, dass die Wahrscheinlichkeit mit einer zuverlässigeren Commit-Strategie kleiner wird.

+0

Das Problem ist, ich habe eine Zählerspalte in Cassandra, die basierend auf der Nachricht von Kafka-Verbraucher erhöht wird. Wenn doppelte Lesevorgänge auftreten, wird der Zähler mehr als einmal inkrementiert, was zu falschen Analysen führt. –

+0

Willkommen in der Welt der Nachrichtenübermittlung. Für Ihr Szenario, "genau einmal" Lieferung ist unmöglich zu erreichen (google es, wenn Sie mir nicht glauben). Wie gesagt, Sie können die Wahrscheinlichkeit einer doppelten Lieferung minimieren, aber nicht eliminieren. Betrachten Sie den Fall, in dem Sie mongodb aktualisieren und dann der Server abstürzt, bevor Sie den kafka-Offset festschreiben; Ergebnis - Nachlieferung. Wenn es kritisch ist, müssen Sie zuerst mongo überprüfen, um zu sehen, ob Sie dieses Ereignis bereits gespeichert haben. –

+0

Danke @Gary. Deine Antwort hat mir wirklich geholfen. Wenn genau einmal "Lieferung unmöglich ist, dann wie Bank und missionskritisches System funktioniert, weiß ich, dass sie RDBMS verwenden, aber welches Messaging-Tool sie verwenden –