Ich versuche einen Spring Cloud Dataflow-Stream zu implementieren, der Datensätze aus einer Datenbank liest, diese an einen Prozessor weiterleitet, der in ein Avro-Schema konvertiert und dann an eine Sink-Anwendung weitergibt.Implementierung von Avro in Spring Cloud Dataflow
Ich habe die Daten fließen von der SQL-DB zu meiner Quelle App und über die Daten über die Kafka-Binder ohne Probleme, indem ich Probleme beim Senden der Daten vom Prozessor auf die Sink-Anwendung Serialisierung/Deserialisierung läuft mit Avro.
Ich habe ein avro-Schema namens ech.avsc erstellt und eine Klasse namens EchRecord dafür erzeugt, die das avro-maven-plugin im Prozessor benutzt.
I haben die folgenden Abhängigkeiten zum POM sowohl Prozessor hinzugefügt und Waschbecken
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
I die Eigenschaften des Prozessors zu
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
auf der Senkenseite eingestellt haben die Eigenschaften aussehen spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
Der Code der Prozessoranwendung sieht wie folgt aus:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
Auf der Seite Sink der Code wie es aussieht steht wie folgt:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
ich ein Feder Schema Registry Server laufen haben und erreichbar durch beide Anwendungen, und ich kann auf das Abfragen der Registrierung sehen, dass die Schema wurde an den Server übermittelt.
Ich kann sehen, ob ich die Debug-Protokollierung auf der Spüle Anwendung zu aktivieren, dass der content gesetzt wird korrekt auf den empfangenen Nachrichten: content = application/vnd.echrecord.v1 + avro
In der Sink-Anwendung Ich habe Setup eine Methode mit der @StreamListener-Annotation zum Abrufen der Nachrichten, die ein Objekt aufnehmen, und zum Ausdrucken der Daten und des Klassentyps, und es scheint, dass ein Byte-Array abgerufen wird.
Wie gehe ich vor, den Code der Sink-Anwendung zu ändern, um die Avro-Nachricht in etwas zu deserialisieren, von dem ich die festgelegten Daten abrufen kann?
Könnten Sie eine kleine Beispielanwendung zur Verfügung stellen (Quelle und Senke), wo wir das Problem reproduzieren können? Es ist nicht notwendig, irgendeine db-Quelle zu verwenden, sondern nur eine grundlegende Quelle, die mit Avro serialisiert wird, und ein Kunde, der es deserialisiert. Auf diese Weise ist es einfacher, Fehler zu beheben. – sobychacko