2017-03-31 5 views
1

Hallo, ich bin derzeit in Frühling Kafka plant und erfolgreich eine einzige KafkaListenerContainerFactory zu meinem Hörer hinzugefügt. Jetzt möchte ich mehrere KafkaListenerContainerFactorys hinzufügen (Eins für ein Thema, das Nachrichten in JSON haben wird, ein anderes für Strings). Siehe Code unten:Probleme beim Hinzufügen mehrerer KafkaListenerContainerFactories

@EnableKafka 
@Configuration 
public class KafkaConsumersConfig { 

    private final KafkaConfiguration kafkaConfiguration; 

    @Autowired 
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) { 
     this.kafkaConfiguration = kafkaConfiguration; 
    } 

    @Bean 
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){ 
     ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(jsonConsumerFactory()); 
     factory.setConcurrency(3); 
     factory.setAutoStartup(true); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String,Record> jsonConsumerFactory(){ 
     JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class); 
     return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer); 
    } 

    @Bean 
    public Map<String,Object> jsonConsumerConfigs(){ 
     Map<String,Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress()); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId()); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit()); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval()); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout()); 
     return propsMap; 
    } 
    @Bean 
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){ 
     ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(fileConsumerFactory()); 
     factory.setConcurrency(3); 
     factory.setAutoStartup(true); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String,String> fileConsumerFactory(){ 
     return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs()); 
    } 

    @Bean 
    public Map<String,Object> fileConsumerConfigs(){ 
     Map<String,Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress()); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId()); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit()); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval()); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout()); 
     return propsMap; 
    } 
} 

das Lauf gibt mir die folgende Fehlermeldung:

Description: 

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found. 
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory' 


Action: 

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration. 

Was mache ich falsch?

Antwort

4

Sieht so aus, als würden Sie sich nicht auf die Spring Boot Kafka Auto Configuration verlassen.

Frühlings-Boot bietet im KafkaAutoConfiguration:

@Bean 
@ConditionalOnMissingBean(ConsumerFactory.class) 
public ConsumerFactory<?, ?> kafkaConsumerFactory() { 

Da Sie jsonConsumerFactory und fileConsumerFactory, sie, dass man durch die Auto-config bereitgestellt außer Kraft setzen.

Aber auf der anderen Seite, in den KafkaAnnotationDrivenConfiguration kann nicht von Ihren Fabriken angewandt werden:

@Bean 
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") 
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
     ConcurrentKafkaListenerContainerFactoryConfigurer configurer, 
     ConsumerFactory<Object, Object> kafkaConsumerFactory) { 

Weil Ihre ConsumerFactory Bohnen nicht von ConsumerFactory<Object, Object> Typ sind.

So:

  • Nur ausschließen KafkaAutoConfiguration aus der Frühjahr-Boot-Auto-Konfiguration durch die folgende auf die Anwendungseigenschaften Hinzufügen von Datei: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
  • oder einer Ihrer KafkaListenerContainerFactory Bohnen umbenennen, um die kafkaListenerContainerFactory es außer Kraft zu setzen in der Boot
  • oder eine der ConsumerFactory Bohnen als ConsumerFactory<Object, Object> Typ.
Verwandte Themen