2017-09-22 2 views
0

Derzeit habe ich einen Anwendungsfall, wo ich Nachrichten von RabbitMQ Message Bus nehmen muss, hängen Sie die Nachrichtengröße (in Bytes) und Nachricht mit HDFS Sink.Spring Cloud Dataflow (Kaninchen | Prozessor | hdfs) Ausgabe binär

Um zu starten, habe ich meinen eigenen Prozessor erstellt, der die Größe an die Nachricht anfügt. Der Grund dafür ist, dass die Kodierung die eines Google Protocol Buffer sein muss.

Meine app sieht wie folgt aus:

stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs " 

Wenn HDFS Sink gibt die Nachricht I [B @ 12768762 sehen. Ich habe um Google'd und haben Empfehlungen gesehen folgendes hinzuzufügen:

spring.cloud.stream.bindings.input.consumer.headerMode=raw 

Doch dies mich nicht sehen überhaupt zu helfen! Das heißt, wenn ich die App zu einer Datei mit den folgenden wechseln:

[input | processor ] | file --binary=true 

Dann funktioniert alles gut. Ich mag jedoch die Rollover-Funktionen, die der HDFS Sink bietet.

Irgendwelche Ideen?

Antwort

0

Die Datei funktioniert, weil sie nur die empfangenen Bytes ausgibt, aber wenn man sich die HDFS-Senke ansieht, scheint sie ein java.io.Serializable Objekt als Eingabe zu benötigen. Aber in Ihrem Fall senden Sie eine Byte-Array von einem Protobuf-Objekt (ich gehe davon aus, dass das ist, was los ist)

+0

Ja, der Prozessor, den ich implementiert habe, gibt ein Byte-Array des GPB zurück. Sagen Sie, dass es eine java.io.Serializable zurückgeben sollte? –

+0

Ich habe versucht "stream deploy --name rabbit-to-log --properties" app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io. Serializable "", die nicht funktionierte. Wollen Sie damit sagen, dass der Prozessor ein java.io.Serializable im Gegensatz zu byte [] zurückgeben soll –

+0

Ich habe den Prozessor neu geschrieben, um ein Envelope-Objekt zurückzugeben, das von com.google.protobuf.GeneratedMessageV3 erbt. Diese Klasse wiederum erbt von Serializable. Wenn ich die neue App abmelden/registrieren und mit der Verarbeitung von Daten beginne, erhalte ich die Fehlermeldung "CUAVProtos $ Envelope kann nicht deserialisiert werden, indem ich contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope" verwende. –

0

Die Typen sind nicht kompatibel, das ist das Problem. Wenn Sie diesen contentType in SCS setzen, fragen Sie das Framework nur nach der Java-Serialisierung, um writeObject aufzurufen. Aber da Sie den Protobuf verwenden, der bereits ein Serialisierungsframework ist, würde es nicht funktionieren. Das Problem hier ist, dass die Senke wirklich scheint (ich bin nicht mit dem Sink-Code vertraut), um eine Serializable zu erwarten, aber Sie stellen keine zur Verfügung. Was Sie tun könnten, ist die Senken-App zu modifizieren oder einen benutzerdefinierten Konverter zur Verfügung zu stellen, der weiß, wie man von Protobuf zu Serializable konvertiert, weiß nicht einmal, ob das sinnvoll ist, um ehrlich zu sein.

Verwandte Themen