2017-09-06 1 views
2

Ich möchte einen einzigen Produzenten für das Schreiben von JSON-Objekten zu mehreren Themen verwenden.Spring Kafka single producer für mehrere Themen

Der folgende Code tut, was ich will, aber es fühlt sich falsch an, die setDefaultTopic() Methode zu verwenden, um dem KafkaTemplate mitzuteilen, zu welchem ​​Thema es die Nachricht senden sollte.

Wenn ich die send(String topic, ? payload) Methode als die StringJsonMessageConverter wird nicht funktionieren.

Mein Produzent:

public class MyProducer { 

    @Autowired 
    private KafkaTemplate<String, ?> kafka; 

    public void send(String topic, Message<?> message) { 
     kafka.setDefaultTopic(topic); 
     kafka.send(message); 
    } 
} 

Und meine Konfiguration:

@Configuration 
public class MyProducerConfig { 

    @Bean 
    public ProducerFactory<String, String> producerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     ... 

     return new DefaultKafkaProducerFactory<>(props); 
    } 

    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { 
     KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); 
     kafkaTemplate.setMessageConverter(new StringJsonMessageConverter()); 

     return kafkaTemplate; 
    } 
} 

Vorschläge, wie diese richtig zu machen?

UPDATE

änderte ich den Code zu diesem ...

Hersteller:

public void send(Message<?> message) { 
    kafka.send(message); 
} 

In meinem Controller (wo ich die Nachricht erstellen Objekte);

MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName")); 
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers); 
producer.send(genericMessage); 

Das MessageHeaders-Objekt enthält weiterhin die ID und den Zeitstempel.

Antwort

2

Bei Verwendung der send(Message<?>) Variante des Nachrichtenwandler erwartet das Thema in einem Nachrichtenkopf zu sein ...

String topic = headers.get(KafkaHeaders.TOPIC, String.class); 

Wenn Sie können das Thema auf andere Weise bestimmen, Sie müssen einen benutzerdefinierten Konverter erstellen.

Das Ändern des Standardthemas ist nicht Thread-sicher.

+0

Es funktioniert. Vielen Dank! Ich wusste nichts über die 'KafkaHeaders'-Klasse und die 'MessageHeaders'-Klasse bietet kein statisches Feld für das Thema. –

2

Sie haben StringJsonMessageConverter manuell zu verwenden, bevor Vorlage Aufruf:

ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic); 
kafka.send(producerRecord); 
+0

Dies wird von der Vorlage intern durchgeführt; Sieh meine Antwort. –

Verwandte Themen