2016-10-17 4 views
1

Wie kann ich mit der Nachricht umgehen, die im Frühjahr Integration zu Kafka zu produzieren fehlgeschlagen?Frühling Integration Kafka Outbound Adapter Fehler Handle

Ich habe nicht 'error-channel' ist eine Option bei 'int-kafka: outbound-channel-adapter', frage mich, wo sollte ich die Fehler-Kanal-Informationen hinzufügen, so dass mein ErrorHandler kann "fehlgeschlagen zu produzieren zu kafka "Art des Fehlers. (einschließlich aller Arten von Fehler, Konfiguration, Netzwerk und usw.)

Auch inputToKafka ist in der Warteschlange Kanal, wo sollte ich Fehler-Kanal hinzufügen, um potenzielle Warteschlange voller Fehler behandeln?

<int:gateway id="myGateway" 
      service-interface="someGateway" 
      default-request-channel="transformChannel" 
      error-channel="errorChannel" 
      default-reply-channel="replyChannel" 
      async-executor="MyThreadPoolTaskExecutor"/> 

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka"> 
    <bean class="Transformer"/> 
</int:transformer> 

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
            kafka-template="template" 
            auto-startup="false" 
            channel="inputToKafka" 
            topic="foo" 
            message-key-expression="'bar'" 
            partition-id-expression="2"> 
    <int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0" 
        task-executor="kafkaExecutor"/> 
</int-kafka:outbound-channel-adapter> 

<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    .... 
</bean> 

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> 
    <constructor-arg> 
     <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
      <constructor-arg> 
       <map> 
        <entry key="bootstrap.servers" value="localhost:9092" /> 
        ... 
       </map> 
      </constructor-arg> 
     </bean> 
    </constructor-arg> 
</bean> 

<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'> 
    <bean class="ErrorHandler"/> 
</int:service-activator> 

bearbeiten

<property name="producerListener"> 
    <bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/> 
</property> 

Antwort

1

Alle Fehler auf dem nachgeordneten Fluss wird die error-channel auf Ihrem Gateway gesendet werden. Da kafka jedoch standardmäßig async ist, werden Sie auf diese Weise keine Fehler erhalten. Sie können sync=true auf dem Outbound-Adapter festlegen und dann wird eine Ausnahme ausgelöst, wenn ein Problem auftritt.

Denken Sie daran, aber es wird viel langsamer sein.

Sie können asynchrone Ausnahmen erhalten, indem Sie ProducerListener zu Ihrer KafkaTemplate hinzufügen.

+0

sollte ich einfach den 'bearbeiten' Teil in meine KafkaTemplate Bean hinzufügen? (siehe in Originalfrage bearbeiten.) – edi

+0

Sie müssen den Adapter ableiten und eine Aktion in 'onError()' (oder beiden) ausführen. Wenn Sie 'onSuccessI()' implementieren, müssen Sie 'isInteressedInSuccess()' auf true überschreiben. Der Standard-Listener ('LoggingProducerListener') protokolliert nur Fehler. –

+0

Ich sah die onError ist zurück, void, wie Sie sicherstellen, dass die Nachricht an meine ErrorHandler in meiner ursprünglichen Frage geht? Oder eine andere Möglichkeit, einen anderen errorHandler zu implementieren, der Informationen an replyChannel zurücksendet – edi

Verwandte Themen