1

Ich baue einen Frühling Kafka Verbraucher. Ich habe einen Wiederholungsmechanismus eingestellt. Nachdem die Wiederholungen erschöpft sind, möchte ich die fehlgeschlagene Nachricht in ein letchtes Thema verschieben.Kafka Consumer - Holen Sie Parameter empfangen von Listener in der Wiederherstellungsmethode

Die hören das Verfahren die folgenden Parameter

public void listen(@Payload Map<String, Object> conciseMap, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     Acknowledgment ack) throws JsonProcessingException { 

Im Rahmen der Wiederherstellung Methode hat, möchte ich die conciseMap als Eingabe an Zuhörers Karte weitergegeben holen oder die ursprüngliche Nachricht, die von meinem Thema empfangen wurde. Gibt es einen Weg, es zu tun?

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(conncurrency); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setRetryTemplate(retryTemplate()); 
    factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge(); 

      return null; 
     } 
    }); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

Antwort

1

Sie können nicht bekommen, dass conciseMap in dem RecoveryCallback ‚s umgewandelt RetryContext, aber Sie können ein ConsumerRecord abrufen, die ein Original aus dem Thema vor der Konvertierung ist:

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD) 
+0

Dank Artem. Stellt ConsumerRecord.value die Bytes bereit, die wir in der Listener-Methode erhalten? –

+0

M-m-m. Könnte sein. Es ist ein Ergebnis des mitgelieferten 'Deserializer' –