2017-12-08 4 views
0

Versuchen mit folgendem Codeorg.apache.kafka.common.KafkaException: io.confluent.kafka.serializers.KafkaAvroSerializer

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
// set up consumer 
final Properties consumerProps = new Properties(); 
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); 
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-tutorial"); 
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
// transactional API 
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); 
// consumer --from-beginning 
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); 
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumerProps.put("zookeeper.connect", CLUSTER.zookeeperConnect()); 
consumerProps.put("schema.registry.url", CLUSTER.schemaRegistryUrl()); 
final KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<GenericRecord, GenericRecord>(consumerProps); 
consumer.subscribe(Collections.singletonList(inputTopic)); 

einen kafkas Verbraucher zu konstruieren, scheiterte aber mit Fehler

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615) 
    at com.telefonica.app.test_consumer.KafkaETLConsumerTest.testRunConsumer(KafkaETLConsumerTest.java:192) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at junit.framework.TestCase.runTest(TestCase.java:176) 
    at junit.framework.TestCase.runBare(TestCase.java:141) 
    at junit.framework.TestResult$1.protect(TestResult.java:122) 
    at junit.framework.TestResult.runProtected(TestResult.java:142) 
    at junit.framework.TestResult.run(TestResult.java:125) 
    at junit.framework.TestCase.run(TestCase.java:129) 
    at junit.framework.TestSuite.runTest(TestSuite.java:252) 
    at junit.framework.TestSuite.run(TestSuite.java:247) 
    at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86) 
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:539) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:761) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:461) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:207) 
Caused by: org.apache.kafka.common.KafkaException: io.confluent.kafka.serializers.KafkaAvroSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:673) 
    ... 22 more 

konfluenten 3.0.0 Version

Antwort

1

Das Problem ist hier:

consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class); 

Beachten Sie, dass Sie eine Serializer Klasse an eine Deserializer Konfig übergeben. Das ist genau das, was die Ausnahme, sagt:

io.confluent.kafka.serializers.KafkaAvroSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer 

Ein Serializer kein Deserializer ist.

Versuchen:

consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      io.confluent.kafka.serializers.KafkaAvroDeserializer.class); 
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      io.confluent.kafka.serializers.KafkaAvroDeserializer.class);