2

Ich benutze this als Beispiel für das Lesen von Datei mit Feder Integration, es funktioniert gut, aber wenn ich versuche, Datei an Kafka-Produzent zu senden, funktioniert es nicht. Ich habe versucht, dieses Problem im Internet nachzuschlagen, konnte aber keine Hilfe finden. Hier ist mein Code:Fehler beim Starten von Bean kafkaListenerContainer: java.lang.IllegalArgumentException

Datei: MessageProcessingIntegrationFlow.java:

@Bean 
public IntegrationFlow writeToFile() { 
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL) 
     .transform(m -> new StringBuilder((String)m).toString().toUpperCase()) 
//    .handle(fileWritingMessageHandler) 
    .handle(loggingHandler()) 
    .handle(kafkaProducerMessageHandler()) 
    .get(); 
} 



//producing channel 
@Bean(name="kafkaChannel") 
public DirectChannel kafkaChannel() { 
    return new DirectChannel(); 
} 

@Bean 
public DirectChannel consumingChannel() { 
    return new DirectChannel(); 
} 


    @Bean 
@ServiceActivator(inputChannel = "kafkaChannel") 
public MessageHandler kafkaProducerMessageHandler() { 
    KafkaProducerMessageHandler<String, String> handler = 
      new KafkaProducerMessageHandler<>(kafkaTemplate()); 
    handler.setTopicExpression(new LiteralExpression(kafkaTopic)); 
    handler.setMessageKeyExpression(new LiteralExpression("kafka-integration")); 
    return handler; 
} 

@Bean 
public KafkaTemplate<String, String> kafkaTemplate() { 
    return new KafkaTemplate<>(producerFactory()); 
} 

@Bean 
public ProducerFactory<String, String> producerFactory() { 
    return new DefaultKafkaProducerFactory<>(producerConfigs()); 
} 

@Bean 
public Map<String, Object> producerConfigs() { 
    Map<String, Object> properties = new HashMap<>(); 
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    // introduce a delay on the send to allow more messages to accumulate 
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); 

    return properties; 
} 

//consumer configuration.... 
@Bean 
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() { 
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = 
     new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer()); 
    kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel()); 
    return kafkaMessageDrivenChannelAdapter; 
} 

@SuppressWarnings("unchecked") 
@Bean 
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() { 
    ContainerProperties containerProps = new ContainerProperties(kafkaTopic); //set topic name 
    return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(
     consumerFactory(), containerProps); 
} 

@Bean 
public ConsumerFactory<?, ?> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> properties = new HashMap<>(); 
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); 
    // automatically reset the offset to the earliest offset 
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

    return properties; 
} 

Hier ist Stacktrace:

org.springframework.context.ApplicationContextException: Failed to start bean 'kafkaListenerContainer'; nested exception is java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided 
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE] 
at com.porterhead.Application.main(Application.java:25) [classes/:na] 
Caused by: java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided 
at org.springframework.util.Assert.isTrue(Assert.java:92) ~[spring-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:199) ~[spring-kafka-1.2.2.RELEASE.jar:na] 
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE] 
... 12 common frames omitted 

Ich weiß nicht, was ich falsch mache. Bitte lassen Sie mich wissen, wenn Sie weitere Details zu diesem benötigen. Danke.

Antwort

2

Die KafkaProducerMessageHandler ist Einwegkomponente, sie produziert keine Antwort. Es veröffentlicht nur zum Kafka-Thema und tut nichts mehr. Daher können Sie nicht weiter fließen, wie Sie es mit Ihrer handle(loggingHandler()) tun. Der KafkaProducerMessageHandler muss der letzte Endpunkt im Fluss sein. Anders als bei der FileWritingMessageHandler ist das eine AbstractReplyProducingMessageHandler und weiter der Fluss.

Dennoch überlegen Sie in der Zukunft, das Problem richtig zu beschreiben: was erwartet wird und was falsch ist. Die Antwort war meine beste Schätzung, weil ich den Code all dieser Komponenten kenne.

+0

Ich wollte die Datei lesen und an Kafka Producer schicken, gibt es eine andere Möglichkeit, Daten an Kafka zu senden? – mansoor67890

+1

Wir haben es, aber was ist das Problem. Wir können hier nicht spekulieren, wenn es kein besonderes Problem gibt. Und außerdem: Hier ist niemand, der Code für dich schreiben kann. Das liegt nicht in unserer Verantwortung. –

+0

Ich habe die Frage bearbeitet. – mansoor67890

Verwandte Themen