2017-01-21 5 views
1

Ich habe einen Prozessor, der byte[] Nutzlasten in MyClass Nutzlasten verwandelt:Spring Cloud Dataflow Typumwandlung funktioniert nicht in der Prozessorkomponente?

@Slf4j 
@EnableBinding(Processor.class) 
public class MyDecoder { 

    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
    public MyClass decode(final byte[] payload) { 
     MyClass decoded = doStuff(payload); 
     if (decoded != null) { 
      log.info("Successfully decoded!"); 
     } 

     return decoded; 
    } 
} 

ich folgende DSL versucht zu erstellen: some-source | my-decoder | some-sink und some-sink meldet Fehler, weil es nicht MyClass Klasse hat in Classloader. Dies ist das erwartete Verhalten.

Ich versuchte Typumwandlung auf my-decoder wie die Anwendung: some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink und ich erhalte die folgenden Fehler in my-decoder Protokoll:

2017-01-20 21:45:17.278 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 
2017-01-20 21:45:18.441 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 
2017-01-20 21:45:20.512 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 
2017-01-20 21:45:20.515 ERROR 9408 --- [afka-listener-2] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = example.some-source, partition = 0, offset = 1, key = null, value = [[email protected]) 

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na] 
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.5.RELEASE.jar!/:na] 
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.5.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778) [spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] 
Caused by: java.lang.IllegalArgumentException: payload must not be null 
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194) ~[spring-cloud-stream-1.1.0.RELEASE.jar!/:1.1.0.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    ... 42 common frames omitted 

ich die Nachricht von byte[] zu MyClass umgewandelt wurde sehen und nicht leer ist. Ich verstehe nicht, warum ich die Meldung 3 mal sehen, bevor es da nicht kafka Eigenschaft ‚Wiederholungen‘ 0 wie im my-decoder Protokoll beim Start gesehen:

2017-01-20 21:44:32.080 INFO 9408 --- [   main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
    compression.type = none 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    metadata.fetch.timeout.ms = 60000 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    bootstrap.servers = [localhost:9092] 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    buffer.memory = 33554432 
    timeout.ms = 30000 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    block.on.buffer.full = false 
    ssl.key.password = null 
    max.block.ms = 60000 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 540000 
    ssl.truststore.password = null 
    max.in.flight.requests.per.connection = 5 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    ssl.protocol = TLS 
    request.timeout.ms = 30000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    acks = 1 
    batch.size = 16384 
    ssl.keystore.location = null 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    retries = 0 
    max.request.size = 1048576 
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    send.buffer.bytes = 131072 
    linger.ms = 0 

Ich habe versucht zu schreiben Integrationstests:

@RunWith(SpringJUnit4ClassRunner.class) 
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) 
@DirtiesContext 
public abstract class MyDecoderTests { 

    @Autowired 
    protected Processor channels; 

    @Autowired 
    protected MessageCollector collector; 

    public static class UsingNothingIntegrationTests extends MyDecoderTest { 

     @Test 
     public void test() throws Exception { 
      channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray()))); 
      assertThat(collector.forChannel(channels.output()), receivesPayloadThat(instanceOf(MyClass.class))); 
     } 
    } 

    @SpringBootTest("spring.cloud.stream.bindings.output.contentType=application/json") 
    public static class UsingOutputConverterIntegrationTests extends MyDecoderTest { 

     @Test 
     public void test() throws Exception { 
      channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray()))); 
      assertThat(collector.forChannel(channels.output()), receivesPayloadThat(is("{\"example\": true\"}"))); 
     } 
    } 

    @Configuration 
    @EnableAutoConfiguration 
    @Import(MyDecoderConfiguration.class) 
    public static class MyDecoderTestApplication { 

    } 
} 

Die Tests laufen erfolgreich, die Konvertierung findet statt.

Dann dachte ich, mein DSL ist nicht richtig, so dass ich eine neue Quelle schrieb mit testen:

@Bean 
@InboundChannelAdapter(Source.OUTPUT) 
public MessageSource<MyClass> exampleSource() { 
    return() -> new GenericMessage<>(getMyClassObject()); 
} 

Und der folgende DSL wandelt MyClass in JSON wie erwartet: my-source --spring.cloud.stream.bindings.output.contentType=application/json | some-sink

Warum bin ich die Nachricht über die Dekodierung dreimal geloggt bekommen und warum ist die Nachricht "Nutzlast muss nicht Null sein" fehlgeschlagen? Ist es etwas mit meinem Prozessor?

Antwort

1

Sie sehen 3 Versuche, da dies die Standardkonfiguration für die Wiederholung des Eingangskanals in der Sammelmappe ist.

Es gibt einen Fehler in der Sammelmappe, wenn der Konverter die Nachricht nicht konvertieren kann, versucht es eine Nachricht mit einer null Nutzlast zu erstellen.

Der Grund, warum die Payload nicht konvertiert werden kann, liegt darin, dass sie den eingehenden Content-Typ (vermutlich application/octet-stream) sieht und nicht von diesem in JSON konvertiert werden kann.

Die Arbeit ist um eine Datei zu dem Classpath hinzuzufügen:

META-INF/spring.integration.properties 

und

spring.integration.readOnly.headers=contentType 

, um es hinzuzufügen.

Das verhindert die Weitergabe des Headers des eingehenden Inhaltstyps an die ausgehende Nachricht.

Dies erfordert eine Federintegration von 4.3.2 oder höher.

In einer zukünftigen Version von SCSt wird dies standardmäßig eingestellt.

+0

Wo kann ich die Binderkonfiguration sehen und ändern? Seit der Kafka-Produzent beim Start "Wiederholungen = 0" gemeldet hat? – aturkovic

+0

Eine andere Sache, könntest du bitte darauf hinweisen, was ich in meinem Test dann falsch gemacht habe? Wie wurde UsingOutputConverterIntegrationTests erfolgreich ausgeführt und in JSON konvertiert? – aturkovic

+1

Es hat nichts mit Kafka zu tun, das ist eingebaute Funktionalität nach der Nachrichtenübermittlung, unabhängig von der Binder-Implementierung; siehe [Consumer-Eigenschaften im Spring-Cloud-Stream] (http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR1/reference/htmlsingle/#_consumer_properties); speziell 'maxAttempts'. Der Grund für den Test liegt darin, dass Sie in der Nachricht, die von Ihrem Test gesendet wurde, keinen Header "contentType" festgelegt haben. Wie gesagt, es ist ein Fehler; Der Outbound-Converter verwendet den eingehenden Header "contentType". Wenn Sie diesen Header in Ihrem Test setzen, würde ich erwarten, dass Sie den gleichen Fehler erhalten. –

Verwandte Themen