2016-08-20 3 views
2

Ich habe mehrere Produzenten, die mehrere Arten von Ereignissen an ein Kafka-Thema senden können.Spring kafka Mehrere Nachrichtentypen in einem Consumer konsumieren

Und ich habe einen Verbraucher, der alle Art von Nachricht verbrauchen muss. mit unterschiedlicher Logik für jede Art von Nachricht.

@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 
public void handleEvent(Message<EventOne> event) { 
    logger.info("event={}", event); 
} 

Aber in diesem Fall alle Nachrichten an dieser Methode kommen EventOne nicht nur

Wenn ich zwei Verfahren implementieren (für jede Art von Nachricht) dann alle Nachrichten kommen, um nur eine Methode.

Wenn ich Zuhörer wie dies umzusetzen:

@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 
public void handleEvent(Message<?> event) { 
    logger.info("event={}", event); 
} 

dann bekomme ich Ausnahme: org.springframework.kafka.KafkaListenerEndpointContainer # 0-0-kafka-Hörer-1] ERROR org.springframework.kafka.listener .LoggingErrorHandler - Fehler beim Verarbeiten: ConsumerRecord java.lang.IllegalArgumentException: Unbekannter Typ: [null]

Bitte sagen Sie mir, wie ich mehrere Typ Verbraucher implementieren kann?

Antwort

2

Ich fand die Auflösung und es ist sehr einfach. So ist es nicht von der Frage klar, aber ich benutze jsonMessageConverter wie folgt aus:

@Bean 
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setMessageConverter(new StringJsonMessageConverter()); 
    return factory; 
} 

Die Jackson-Bibliothek mit Standard Sie fügt keine Klasseninformationen für JSON daher kann es nicht erraten, welche Art von mir will deserialisieren. Lösung ist die Anmerkung

@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class") 

die classinformation zu json Zeichenfolge hinzufügt. Auf der Verbraucherseite Ich schreibe nur die Basisklasse von meiner Veranstaltung

@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 
public void handleEvent(BasicEvent event) { 

    if (event instanceof EventOne) { 
     logger.info("type={EventOne}, event={}", event); 
    } else { 
     logger.info("type={EventTwo}, event={}", event); 
    } 
} 

Hope this info jemand hilft.

Verwandte Themen