0

Ich möchte einen Spring-Cloud-Stream-Kafka-Produzent mit Spring Boot einrichten.Spring Cloud Stream Kafka Produzent Nachrichten

Der Produzent arbeitet, und ich kann die Nachrichten aus dem kafka Broker verbrauchen aber die Nachrichten auch einige Header-Informationen wie die folgenden Elemente enthalten:

contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"message":"hello"} 

Mein POJO enthält ein Feld (String message) also bin ich erwartet, dass nur der JSON-String an kafka gesendet wird.

Die Methode test() in meinem RestController löst den Hersteller:

@EnableBinding(ProducerChannels.class) 
@SpringBootApplication 
@RestController 
public class KafkaStreamProducerApplication { 

private MessageChannel consumer; 

public KafkaStreamProducerApplication(ProducerChannels channels) { 
    this.consumer = channels.consumer(); 
} 

@PostMapping("/test/{message}") 
public void test(@PathVariable String message) { 
    Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build(); 
    this.consumer.send(msg); 
} 

interface ProducerChannels { 

    @Output 
    MessageChannel consumer(); 
} 

Mein application.properties

spring.cloud.stream.bindings.consumer.destination=consumer 
spring.cloud.stream.bindings.consumer.content-type=application/json 

Ich würde auch zu schätzen wissen, wenn Sie irgendwelche Dokumente oder Beispiele zu diesem Thema empfehlen. Die Beispiele auf github sind normalerweise sehr dünn, sie verwenden viele Autokonfigurationen und keine Erklärungen. Das Beispiel, das ich verwendet habe, war für RabbitMQ.

Antwort

1

Wenn Sie die Header nicht einbetten möchten (damit Sie Nachrichten in einer Cloud-App empfangen können, die nicht von Spring Cloud stammt), setzen Sie headerMode des Herstellers auf raw.

Siehe Producer Properties.

headerMode

Wenn auf RAW gesetzt, deaktiviert die Kopfzeile auf Ausgabe einzubetten. Wirksam nur für Messaging-Middleware, die Nachrichtenkopfzeilen nicht nativ unterstützt und Header-Einbettung erfordert. Nützlich bei der Erstellung von Daten für Nicht-Spring Cloud Stream-Anwendungen.

Vorgabe: eingebettete Header.

+0

War ziemlich sicher, dass ich das ohne Erfolg versuchte. Vielleicht habe ich den Server nicht neu gestartet, weil ich Devtools vergessen habe :(. Danke für die richtige Antwort und den Link zu den Dokumenten. –

0

Die Header contentType und originalContentType werden von Spring Cloud Stream beim Deserialisieren der Nachricht durch die Consumer-Anwendung verwendet und führen eine Nachrichtenkonvertierung basierend auf dem Inhaltstypsatz durch.

Der Header contentType wird nur dann explizit festgelegt, wenn Sie den Inhaltstyp der Bindung so konfigurieren, wie Sie es hier getan haben spring.cloud.stream.bindings.consumer.content-type=application/json. Wenn der Header contentType festgelegt ist, behält Spring Cloud Stream diese Kopfzeile bei, indem das Flag originalContentType während des Serialisierungs-/Deserialisierungsprozesses zum Erstellen/Verwenden von Nachrichten zum/vom Broker (über den Binder) verwendet wird.

In Ihrem Fall, ich denke, Sie müssen möglicherweise die contentType überhaupt nicht einstellen.

Für die Beispiele abgesehen von den Proben in diesem Frühling-Wolke-Strom-Proben github Repo, können Sie auch out of the box app starters beziehen, die eine breite Palette von Anwendungen abdecken, die gegen alle unterstützten Bindemittel (einschließlich Kafka) laufen könnten.

Verwandte Themen