Ich verwende Kafka Version0.10.2.1
und Spring Boot für mein Projekt.Kafka Duplikat lesen
Ich habe 5 Partitionen von einem Thema, das von mehrere Verbrauchern konsumiert werden kann (mit der gleicher Gruppe-Id), die auf andere Maschine ausgeführt werden.
welches Problem ich bin vor ist:
Ich Duplikat einer einzelnen Nachricht mit dieser Kafka Warnung logs
Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Als Protokolle lesen immer zeigen, dass dieses Problem entsteht, weil Kafka Verbraucher konnte nicht übergeben werden.
Hier sind einige Details über meine Use-Case:
Ich habe mehrere Verbraucher von einem Thema
My-Topic
, die zu derselben gehört Gruppe-Idmy-consumer-group
Consumer Nachrichten von Kafka verbraucht , wenden Sie Geschäftslogik an und speichern Sie verarbeitete Daten in
Cassandra
Das Verfahren für das Verbrauchen der Mitteilung von Kafka Das Anwenden der Geschäftslogik und dann das Speichern in Cassandra dauert ungefähr 10 ms pro Nachricht, die von Kafka verbraucht wird.
ich folgenden Code verwenden Kafka-consumer bean
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String brokerURL;
@Value("${spring.kafka.session.timeout}")
private int sessionTimeout;
@Value("${spring.kafka.consumer.my-group-id}")
private String groupId;
@Value("${spring.kafka.listener.concurrency}")
private int concurrency;
@Value("${spring.kafka.listener.poll-timeout}")
private int timeout;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(timeout);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
Dies sind die kafka-Konfiguration I
bin mitspring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2
Mein Hauptanliegen doppelter Lese ist von einem erstellen Wenn mehrere Kafka-Konsumenten zu derselben Verbrauchergruppe und in meiner Anwendung gehören, muss ich doppelte Einträge in die Datenbank vermeiden.
Könnten Sie mir bitte helfen, meine obigen Kafka-Konfigurationen und den Kafka-Consumer-Code zu überprüfen, damit ich das doppelte Lesen vermeiden kann?
Das Problem ist, ich habe eine Zählerspalte in Cassandra, die basierend auf der Nachricht von Kafka-Verbraucher erhöht wird. Wenn doppelte Lesevorgänge auftreten, wird der Zähler mehr als einmal inkrementiert, was zu falschen Analysen führt. –
Willkommen in der Welt der Nachrichtenübermittlung. Für Ihr Szenario, "genau einmal" Lieferung ist unmöglich zu erreichen (google es, wenn Sie mir nicht glauben). Wie gesagt, Sie können die Wahrscheinlichkeit einer doppelten Lieferung minimieren, aber nicht eliminieren. Betrachten Sie den Fall, in dem Sie mongodb aktualisieren und dann der Server abstürzt, bevor Sie den kafka-Offset festschreiben; Ergebnis - Nachlieferung. Wenn es kritisch ist, müssen Sie zuerst mongo überprüfen, um zu sehen, ob Sie dieses Ereignis bereits gespeichert haben. –
Danke @Gary. Deine Antwort hat mir wirklich geholfen. Wenn genau einmal "Lieferung unmöglich ist, dann wie Bank und missionskritisches System funktioniert, weiß ich, dass sie RDBMS verwenden, aber welches Messaging-Tool sie verwenden –