2017-02-23 3 views
0

Ich versuche, Spring Boot-Anwendung mit Kafka-Client nach this reference guide zu starten, und ich bekomme den Fehler unten.
Könnten Sie bitte beraten, wie zu beheben?KafkaException: Konnte Klasse JsonDeserializer nicht instanziieren

@Bean 
public Map<String, Object> consumerConfig() { 
    final HashMap<String, Object> result = new HashMap<>(); 
    result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap.servers")); 
    result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 
    result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
    return result; 
} 

@Bean 
public ConsumerFactory<Long, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfig()); 
} 

@Bean 
ConcurrentKafkaListenerContainerFactory<Long, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Long, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); 
    containerFactory.setConsumerFactory(consumerFactory()); 
    containerFactory.setConcurrency(3); 
    containerFactory.getContainerProperties().setPollTimeout(3000); 
    return containerFactory; 
} 

-

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:545) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at com.ubs.wma.bmss.BmssConsumerApp.main(BmssConsumerApp.java:12) [classes/:na] 
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:124) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    ... 12 common frames omitted 
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.springframework.kafka.support.serializer.JsonDeserializer 
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:316) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:203) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) ~[kafka-clients-0.10.1.1.jar:na] 
    ... 24 common frames omitted 
Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected" 
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121] 
    at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121] 
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na] 
    ... 26 common frames omitted 
+0

Dump nicht einfach einen Stacktrace ohne Kontext, Konfiguration und Code und erwarte Hilfe ... –

Antwort

1

Nach dieser Dokumentation haben wir:

für komplexere oder besonderen Fällen die KafkaConsumer und damit KafkaProducer stellt ladenen Konstruktoren (De)Serializer Instanzen akzeptieren für Schlüssel bzw. Werte.

diese API gerecht zu werden, die DefaultKafkaProducerFactory und DefaultKafkaConsumerFactory bieten auch Eigenschaften zu ermöglichen, eine Gewohnheit zu injizieren (De)SerializerProducer/Consumer Ziel.

Und weiter Apache Kafka JavaDocs:

/** 
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. 
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. 
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept 
* either the string "42" or the integer 42). 
* @param configs The producer configs 
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be 
*      called in the producer when the serializer is passed in directly. 
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't 
*       be called in the producer when the serializer is passed in directly. 
*/ 
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { 

Also, was Sie brauchen, ist wie folgt:

@Bean 
public ConsumerFactory<Long, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), null, new JsonDeserializer(Foo.class)); 
} 

Das Problem, dass JsonDeserializer kann nicht durch die Reflexion instanziiert werden, da es braucht bestimmter Typ zum Deserialisieren.

Verwandte Themen