2

Ich habe versucht, Kafka-Verbraucher aus dem Code es immer Ausnahme ausführen, aber ich lief kafka-console-consumer.sh-Datei zu überprüfen, Hersteller es funktioniert gut und zeigt alle Nachrichten an, die der Broker erhalten hat. Im Folgenden finden Sie pom.xml Code- und Ausnahmeprotokolle. Bitte sag mir, wo ich falsch liege.Spring Integration kafka: org.apache.kafka.common.config.ConfigException während der Ausführung von Consumer

public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:2181"); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_coonfig"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.IntegerDeserializer"); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringDeserializer"); 
    return props; 
} 

Hier ist mein Test-Klassencode.

@Test 
public void testSpringKafkaConsumer() throws InterruptedException { 

    try{ 
    String topics[] = { "programTopic3" }; 
    ConsumerFactory<Integer, String> factory = new DefaultKafkaConsumerFactory<>(configs); 
    factory.createConsumer(); 
    AbstractMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(factory, 
      topics); 
    container.setBeanName("container"); 

    final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>(); 
    container.setMessageListener(new MessageListener<Integer, String>() { 

     @Override 
     public void onMessage(ConsumerRecord<Integer, String> message) { 
      // logger.info("received: " + message); 
      System.out.println("received: --------+++++++++++++++------------" + message); 
      records.add(message); 
     } 
    }); 
    KafkaMessageDrivenChannelAdapter<Integer, String> adaptor = new KafkaMessageDrivenChannelAdapter<>(container); 

    adaptor.start(); 
    ConsumerRecord<Integer, String> poll = null; 
    while((poll =records.take()) != null){ 
     System.out.println(poll.topic() + " topic"); 
     System.out.println(poll.key() + " key"); 
     System.out.println(poll.value()+ " value"); 
    } 

    }catch(Exception exception) 
    { 
     exception.printStackTrace(); 
     Assert.fail(); 
    } 
} 

pom.xml

<?xml version="1.0" encoding="UTF-8"?> 

http://maven.apache.org/xsd/maven-4.0.0.xsd "> 4.0.0

<groupId>com.learn.kafka.integrate.spring</groupId> 
<artifactId>SpringIntegrationKafka</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>SpringIntegrationKafka</name> 
<description>Demo project for Spring Integration kafka</description> 

<properties> 
    <springVersion>4.2.5.RELEASE</springVersion> 
    <springIntegrationVersion>4.2.5.RELEASE</springIntegrationVersion> 
    <mockitoVersion>1.10.19</mockitoVersion> 
</properties> 
<repositories> 
    <repository> 
     <id>repository.spring.milestone</id> 
     <name>Spring Milestone Repository</name> 
     <url>http://repo.spring.io/milestone</url> 
    </repository> 
</repositories> 
<dependencies> 
    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-api</artifactId> 
     <version>1.7.21</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-core</artifactId> 
     <version>${springIntegrationVersion}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-kafka</artifactId> 
     <version>2.0.0.M1</version> 
    </dependency> 
    <dependency> 
<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.10</artifactId> 
<version>0.9.0.1</version> 
</dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-test</artifactId> 
     <version>${springVersion}</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-test</artifactId> 
     <version>${springVersion}</version> 
    </dependency> 
</dependencies> 
<build> 
    <plugins> 
     <plugin> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <version>3.3</version> 
      <configuration> 
       <source>1.8</source> 
       <target>1.8</target> 
      </configuration> 
     </plugin> 
    </plugins> 
</build> 

Ausnahmeprotokoll:

org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value. 
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:148) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56) 
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336) 
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512) 
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494) 
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:46) 
at com.learn.kafka.integrate.spring.TestConsumer.testSpringKafkaConsumer(TestConsumer.java:83) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
at java.lang.reflect.Method.invoke(Unknown Source) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) 
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) 
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) 
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
+0

Du nicht die ganze Geschichte zeigt 'ConsumerFactory factory = new DefaultKafkaConsumerFactory <> (configs); '- Es scheint, dass die' configs'-Variable nicht auf die Eigenschaften verweist, die von 'consumerConfigs()' erstellt wurden. –

Antwort

2
org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value. 

Sieht aus wie Ihre new DefaultKafkaConsumerFactory<>(configs); nicht verwendet consumerConfigs().

Von anderer Seite die KafkaMessageDrivenChannelAdapter tut genau dies in seiner Ctor:

this.messageListenerContainer = messageListenerContainer; 
this.messageListenerContainer.setAutoStartup(false); 
this.messageListenerContainer.setMessageListener(this.listener); 

Also, Ihr container.setMessageListener(new MessageListener<Integer, String>() { nicht erreichbar ist. Daher wird nichts in der records erscheinen.

Ich empfehle Spring Integration für diesen speziellen Test zu vermeiden, wenn Sie es noch nicht verstehen.

Für die KafkaMessageDrivenChannelAdapter Variante müssen Sie die outputChannel als QueueChannel angeben, um Nachricht mit der poll Art und Weise abzurufen.

Aber auch Sie müssen mehr BeanFactory Zeug um KafkaMessageDrivenChannelAdapter tun.

Siehe unsere Testfall für weitere Informationen: https://github.com/spring-projects/spring-integration-kafka/blob/master/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

auch die Aufmerksamkeit auf die Beispielanwendung zahlen basiert auf Kafka-0.9, auch: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka

+0

Danke für das wertvolle Feedback. Ich habe den gleichen Code versucht es funktioniert gut, wenn ich gab 9092 Port-Nummer mit Adresse, die kafka Server-Port ist, aber wenn von Kafka Consumer-Konsole laufen, stelle ich zookeeper Port 2181. So bin ich damit verwirrt. – rahul

+0

Ja, das ist richtig: http://stackoverflow.com/questions/34935596/zookeeper-usage-on-kafka-0-9-0 –

Verwandte Themen