Derzeit habe ich einen Anwendungsfall, wo ich Nachrichten von RabbitMQ Message Bus nehmen muss, hängen Sie die Nachrichtengröße (in Bytes) und Nachricht mit HDFS Sink.Spring Cloud Dataflow (Kaninchen | Prozessor | hdfs) Ausgabe binär
Um zu starten, habe ich meinen eigenen Prozessor erstellt, der die Größe an die Nachricht anfügt. Der Grund dafür ist, dass die Kodierung die eines Google Protocol Buffer sein muss.
Meine app sieht wie folgt aus:
stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs "
Wenn HDFS Sink gibt die Nachricht I [B @ 12768762 sehen. Ich habe um Google'd und haben Empfehlungen gesehen folgendes hinzuzufügen:
spring.cloud.stream.bindings.input.consumer.headerMode=raw
Doch dies mich nicht sehen überhaupt zu helfen! Das heißt, wenn ich die App zu einer Datei mit den folgenden wechseln:
[input | processor ] | file --binary=true
Dann funktioniert alles gut. Ich mag jedoch die Rollover-Funktionen, die der HDFS Sink bietet.
Irgendwelche Ideen?
Ja, der Prozessor, den ich implementiert habe, gibt ein Byte-Array des GPB zurück. Sagen Sie, dass es eine java.io.Serializable zurückgeben sollte? –
Ich habe versucht "stream deploy --name rabbit-to-log --properties" app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io. Serializable "", die nicht funktionierte. Wollen Sie damit sagen, dass der Prozessor ein java.io.Serializable im Gegensatz zu byte [] zurückgeben soll –
Ich habe den Prozessor neu geschrieben, um ein Envelope-Objekt zurückzugeben, das von com.google.protobuf.GeneratedMessageV3 erbt. Diese Klasse wiederum erbt von Serializable. Wenn ich die neue App abmelden/registrieren und mit der Verarbeitung von Daten beginne, erhalte ich die Fehlermeldung "CUAVProtos $ Envelope kann nicht deserialisiert werden, indem ich contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope" verwende. –