Ich baue einen Frühling Kafka Verbraucher. Ich habe einen Wiederholungsmechanismus eingestellt. Nachdem die Wiederholungen erschöpft sind, möchte ich die fehlgeschlagene Nachricht in ein letchtes Thema verschieben.Kafka Consumer - Holen Sie Parameter empfangen von Listener in der Wiederherstellungsmethode
Die hören das Verfahren die folgenden Parameter
public void listen(@Payload Map<String, Object> conciseMap,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment ack) throws JsonProcessingException {
Im Rahmen der Wiederherstellung Methode hat, möchte ich die conciseMap als Eingabe an Zuhörers Karte weitergegeben holen oder die ursprüngliche Nachricht, die von meinem Thema empfangen wurde. Gibt es einen Weg, es zu tun?
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge();
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
Dank Artem. Stellt ConsumerRecord.value die Bytes bereit, die wir in der Listener-Methode erhalten? –
M-m-m. Könnte sein. Es ist ein Ergebnis des mitgelieferten 'Deserializer' –