Ich habe versucht, einige POC Arbeit für Spring Kafka zu tun. Insbesondere wollte ich mit den Best Practices im Umgang mit Fehlern experimentieren, während ich Nachrichten innerhalb von Kafka konsumiere.Kafka Consumer Ausnahme und Offset Commit
Ich frage mich, ob jemand in der Lage ist, mit helfen:
- Austausch bewährter Praktiken rund um was Kafka Verbraucher sollten tun, wenn es zu einem Ausfall ist
- Helfen Sie mir zu verstehen, wie AckMode Nehmen funktioniert und wie man Verhindern Sie, dass die Kafka-Offset-Warteschlange festgeschrieben wird, wenn in der Listener-Methode eine Ausnahme ausgelöst wird.
Das Codebeispiel 2 wird unten gegeben:
Da AckMode auf RECORD gesetzt, die die documentation nach:
den Offset begehen, wenn der Hörer nach dem Verarbeitung kehrt Aufzeichnung.
Ich hätte gedacht, dass der Offset nicht inkrementiert würde, wenn die Listener-Methode eine Ausnahme auslöst. Dies war jedoch nicht der Fall, als ich es mit der folgenden code/config/command-Kombination getestet habe. Der Offset wird weiterhin aktualisiert, und die nächste Nachricht wird weiterhin verarbeitet.
Meine config:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
Mein Code:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition(topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
Befehl zum verify-Offset:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
Ich kafka_2.12-0.10.2.0 und org verwenden. springframework.kafka: spring-kafka: 1.1.3.RELEASE
Danke für den Tipp @Gary. Wissen Sie, ob es Best Practices für den Umgang mit Fehlern bei Kafka-Konsumenten gibt? Es scheint, als würde ein Fehler beim Lesen einer Nachricht einfach protokolliert und dann verschluckt. – yfl
Außerdem habe ich festgestellt, dass die Kommentare für den Code widersprüchlich zu sein scheint: "ist nur wirksam, wenn auto ack ist falsch; es ist nicht anwendbar auf manuelle acks.". Ich schätze, der erste Teil sollte wahr sein, nicht falsch? – yfl
Wir sollten das beheben; sie sind verschiedene Dinge; In diesem Zusammenhang bezieht sich auto ack auf eine kafka-Eigenschaft 'enable.auto.commit' (wobei der kafka-Bibliotheks-Consumer-Client intern seine eigenen acks ausführt). 'AckMode' wird verwendet, um dem Container mitzuteilen, wann/ob Commits ausgeführt werden sollen, wenn' enable.auto.commit' 'false' ist. Wenn das für manuelle Acks gesetzt ist, dann wird der Container nie gepickt. –