2017-10-24 1 views
0

Ich brauche verschiedene JSON Nutzlast auf dem gleichen Kafka Thema zu senden (zB Foo, Bar, Auto ...) ohne Eltern mit KlasseFeder kafka: verschiedene json Nutzlast auf dem gleichen Thema

Basierend auf Frühjahr kafka Dokumentation, kann ich @KafkaListener auf Klassenebene verwenden und @KafkaHandler auf der Methode Ebene (doc)

@KafkaListener(topics = "myTopic") 
static class MultiListenerBean { 

    @KafkaHandler 
    public void listen(Foo foo) { 
     ... 
    } 

    @KafkaHandler 
    public void listen(Bar bar) { 
     ... 
    } 

    @KafkaHandler 
    public void listen(Car car) { 
     ... 
    } 
} 

angeben, aber ich bekomme diese Ausnahme:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"}) 
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap 
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92) 
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147) 
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60) 
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) 
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) 
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.lang.Thread.run(Thread.java:745) 

Ich habe auch diesen Ansatz versucht, aber es funktioniert nicht:

@KafkaListener(topics = "myTopic") 
    public void receive(ConsumerRecord<String, ?> payload) 
    { 
     if (payload.value() instanceof Foo) 
     { 
     // 
     } 
     else if (payload.value() instanceof Car) 
     { 
     // 
     } 
    } 

Wie ich meinen Produzenten und Konsumenten konfigurieren können verschiedene JSON Payload auf dem gleichen Thema senden Frühjahr mit kafka?

+0

Zeigen Sie bitte, wie Sie aus JSON deserialisieren. –

Antwort

0

Es gibt eine catch-22, wenn JSON und Multi-Method Listeners verwendet werden; wir müssen den Typ kennen, um zur richtigen Methode zu gelangen; Wir können den Typ für die Konvertierung aus der Methodensignatur nicht ableiten, da wir nicht wissen, welche Methode wir noch aufrufen möchten.

Sie benötigen einen Nachrichtenkonverter, der den Typ entweder aus den Daten oder unter Verwendung einer Header (mit Kafka 11) herausfinden kann.

Dann, nachdem der Konverter in den richtigen Typ konvertiert wurde, können wir dann herausfinden, welche Methode aufgerufen werden soll.

Mit Ihrem zweiten Versuch wird ConsumerRecord das rohe JSON enthalten; Sie müssen einen ObjectMapper verwenden, um die Konvertierung durchzuführen, aber Sie werden wiederum einen Hinweis darauf benötigen, in welchen Typ Sie konvertieren möchten (oder mit roher Gewalt, bis zum Erfolg).

Verwandte Themen