Ich möchte einen Spring-Cloud-Stream-Kafka-Produzent mit Spring Boot einrichten.Spring Cloud Stream Kafka Produzent Nachrichten
Der Produzent arbeitet, und ich kann die Nachrichten aus dem kafka Broker verbrauchen aber die Nachrichten auch einige Header-Informationen wie die folgenden Elemente enthalten:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"message":"hello"}
Mein POJO enthält ein Feld (String message) also bin ich erwartet, dass nur der JSON-String an kafka gesendet wird.
Die Methode test() in meinem RestController löst den Hersteller:
@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {
private MessageChannel consumer;
public KafkaStreamProducerApplication(ProducerChannels channels) {
this.consumer = channels.consumer();
}
@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
this.consumer.send(msg);
}
interface ProducerChannels {
@Output
MessageChannel consumer();
}
Mein application.properties
spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json
Ich würde auch zu schätzen wissen, wenn Sie irgendwelche Dokumente oder Beispiele zu diesem Thema empfehlen. Die Beispiele auf github sind normalerweise sehr dünn, sie verwenden viele Autokonfigurationen und keine Erklärungen. Das Beispiel, das ich verwendet habe, war für RabbitMQ.
War ziemlich sicher, dass ich das ohne Erfolg versuchte. Vielleicht habe ich den Server nicht neu gestartet, weil ich Devtools vergessen habe :(. Danke für die richtige Antwort und den Link zu den Dokumenten. –