Ich möchte einen einzigen Produzenten für das Schreiben von JSON-Objekten zu mehreren Themen verwenden.Spring Kafka single producer für mehrere Themen
Der folgende Code tut, was ich will, aber es fühlt sich falsch an, die setDefaultTopic()
Methode zu verwenden, um dem KafkaTemplate
mitzuteilen, zu welchem Thema es die Nachricht senden sollte.
Wenn ich die send(String topic, ? payload)
Methode als die StringJsonMessageConverter
wird nicht funktionieren.
Mein Produzent:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}
Und meine Konfiguration:
@Configuration
public class MyProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
return kafkaTemplate;
}
}
Vorschläge, wie diese richtig zu machen?
UPDATE
änderte ich den Code zu diesem ...
Hersteller:
public void send(Message<?> message) {
kafka.send(message);
}
In meinem Controller (wo ich die Nachricht erstellen Objekte);
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);
Das MessageHeaders-Objekt enthält weiterhin die ID und den Zeitstempel.
Es funktioniert. Vielen Dank! Ich wusste nichts über die 'KafkaHeaders'-Klasse und die 'MessageHeaders'-Klasse bietet kein statisches Feld für das Thema. –