2017-03-29 4 views
1

Ich habe versucht, einige POC Arbeit für Spring Kafka zu tun. Insbesondere wollte ich mit den Best Practices im Umgang mit Fehlern experimentieren, während ich Nachrichten innerhalb von Kafka konsumiere.Kafka Consumer Ausnahme und Offset Commit

Ich frage mich, ob jemand in der Lage ist, mit helfen:

  1. Austausch bewährter Praktiken rund um was Kafka Verbraucher sollten tun, wenn es zu einem Ausfall
  2. ist
  3. Helfen Sie mir zu verstehen, wie AckMode Nehmen funktioniert und wie man Verhindern Sie, dass die Kafka-Offset-Warteschlange festgeschrieben wird, wenn in der Listener-Methode eine Ausnahme ausgelöst wird.

Das Codebeispiel 2 wird unten gegeben:

Da AckMode auf RECORD gesetzt, die die documentation nach:

den Offset begehen, wenn der Hörer nach dem Verarbeitung kehrt Aufzeichnung.

Ich hätte gedacht, dass der Offset nicht inkrementiert würde, wenn die Listener-Methode eine Ausnahme auslöst. Dies war jedoch nicht der Fall, als ich es mit der folgenden code/config/command-Kombination getestet habe. Der Offset wird weiterhin aktualisiert, und die nächste Nachricht wird weiterhin verarbeitet.

Meine config:

private Map<String, Object> producerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092"); 
    props.put(ProducerConfig.RETRIES_CONFIG, 0); 
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    return props; 
} 

    @Bean 
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD); 
    return factory; 
} 

Mein Code:

@Component 
public class KafkaMessageListener{ 
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) 
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException { 
      throw new RuntimeException("Oops!"); 
    } 

Befehl zum verify-Offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group 

Ich kafka_2.12-0.10.2.0 und org verwenden. springframework.kafka: spring-kafka: 1.1.3.RELEASE

Antwort

2

Der Behälter (via ContainerProperties) hat eine Eigenschaft, ackOnError die standardmäßig wahr ist ...

/** 
* Set whether or not the container should commit offsets (ack messages) where the 
* listener throws exceptions. This works in conjunction with {@link #ackMode} and is 
* effective only when the kafka property {@code enable.auto.commit} is {@code false}; 
* it is not applicable to manual ack modes. When this property is set to {@code true} 
* (the default), all messages handled will have their offset committed. When set to 
* {@code false}, offsets will be committed only for successfully handled messages. 
* Manual acks will be always be applied. Bear in mind that, if the next message is 
* successfully handled, its offset will be committed, effectively committing the 
* offset of the failed message anyway, so this option has limited applicability. 
* Perhaps useful for a component that starts throwing exceptions consistently; 
* allowing it to resume when restarted from the last successfully processed message. 
* @param ackOnError whether the container should acknowledge messages that throw 
* exceptions. 
*/ 
public void setAckOnError(boolean ackOnError) { 
    this.ackOnError = ackOnError; 
} 

Beachten Sie, dass, obwohl, wenn die nächste Nachricht erfolgreich ist, wird seine Offset ohnehin verpflichtet werden, die effektiv Begeht auch den fehlgeschlagenen Offset.

+0

Danke für den Tipp @Gary. Wissen Sie, ob es Best Practices für den Umgang mit Fehlern bei Kafka-Konsumenten gibt? Es scheint, als würde ein Fehler beim Lesen einer Nachricht einfach protokolliert und dann verschluckt. – yfl

+0

Außerdem habe ich festgestellt, dass die Kommentare für den Code widersprüchlich zu sein scheint: "ist nur wirksam, wenn auto ack ist falsch; es ist nicht anwendbar auf manuelle acks.". Ich schätze, der erste Teil sollte wahr sein, nicht falsch? – yfl

+0

Wir sollten das beheben; sie sind verschiedene Dinge; In diesem Zusammenhang bezieht sich auto ack auf eine kafka-Eigenschaft 'enable.auto.commit' (wobei der kafka-Bibliotheks-Consumer-Client intern seine eigenen acks ausführt). 'AckMode' wird verwendet, um dem Container mitzuteilen, wann/ob Commits ausgeführt werden sollen, wenn' enable.auto.commit' 'false' ist. Wenn das für manuelle Acks gesetzt ist, dann wird der Container nie gepickt. –