2017-11-20 2 views
0

Ich versuche einen Spring Cloud Dataflow-Stream zu implementieren, der Datensätze aus einer Datenbank liest, diese an einen Prozessor weiterleitet, der in ein Avro-Schema konvertiert und dann an eine Sink-Anwendung weitergibt.Implementierung von Avro in Spring Cloud Dataflow

Ich habe die Daten fließen von der SQL-DB zu meiner Quelle App und über die Daten über die Kafka-Binder ohne Probleme, indem ich Probleme beim Senden der Daten vom Prozessor auf die Sink-Anwendung Serialisierung/Deserialisierung läuft mit Avro.

Ich habe ein avro-Schema namens ech.avsc erstellt und eine Klasse namens EchRecord dafür erzeugt, die das avro-maven-plugin im Prozessor benutzt.

I haben die folgenden Abhängigkeiten zum POM sowohl Prozessor hinzugefügt und Waschbecken

<dependency> 
    <groupId>org.springframework.cloud</groupId> 
    <artifactId>spring-cloud-stream-schema</artifactId> 
    <version>1.2.2.RELEASE</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro</artifactId> 
    <version>1.8.2</version> 
</dependency> 

I die Eigenschaften des Prozessors zu

spring.cloud.stream.bindings.output.contentType=application/*+avro 
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true 
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990 

auf der Senkenseite eingestellt haben die Eigenschaften aussehen spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

Der Code der Prozessoranwendung sieht wie folgt aus:

@EnableBinding(Processor.class) 
@SpringBootApplication 
@EnableSchemaRegistryClient 
public class EchProcessorApplication { 

private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class); 

public static void main(String[] args) { 
    SpringApplication.run(EchProcessorApplication.class, args); 
} 


@StreamListener(Processor.INPUT) 
@SendTo(Processor.OUTPUT) 
public EchRecord transform(List<Map<String, Object>> record) { 
    return EchRecord.newBuilder() 
      .setCallId(11111).build();; 
} 

Auf der Seite Sink der Code wie es aussieht steht wie folgt:

@EnableBinding(Sink.class) 
@SpringBootApplication 
@EnableSchemaRegistryClient 
public class AvroLoggerApplication { 



    private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class); 

    public static void main(String[] args) { 
     SpringApplication.run(AvroLoggerApplication.class, args); 
    } 


    @StreamListener(Sink.INPUT) 
    public void logHandler(Object data) { 

     LOGGER.info("data='{}'", data.toString()); 
     LOGGER.info("class='{}'", data.getClass()); 


    } 
} 

ich ein Feder Schema Registry Server laufen haben und erreichbar durch beide Anwendungen, und ich kann auf das Abfragen der Registrierung sehen, dass die Schema wurde an den Server übermittelt.

Ich kann sehen, ob ich die Debug-Protokollierung auf der Spüle Anwendung zu aktivieren, dass der content gesetzt wird korrekt auf den empfangenen Nachrichten: content = application/vnd.echrecord.v1 + avro

In der Sink-Anwendung Ich habe Setup eine Methode mit der @StreamListener-Annotation zum Abrufen der Nachrichten, die ein Objekt aufnehmen, und zum Ausdrucken der Daten und des Klassentyps, und es scheint, dass ein Byte-Array abgerufen wird.

Wie gehe ich vor, den Code der Sink-Anwendung zu ändern, um die Avro-Nachricht in etwas zu deserialisieren, von dem ich die festgelegten Daten abrufen kann?

+1

Könnten Sie eine kleine Beispielanwendung zur Verfügung stellen (Quelle und Senke), wo wir das Problem reproduzieren können? Es ist nicht notwendig, irgendeine db-Quelle zu verwenden, sondern nur eine grundlegende Quelle, die mit Avro serialisiert wird, und ein Kunde, der es deserialisiert. Auf diese Weise ist es einfacher, Fehler zu beheben. – sobychacko

Antwort

1

Ein paar Dinge hier zu versuchen. Da Ihr Typ bereits ein Avro-Typ ist (SpecificRecord oder GenericRecord), benötigen Sie auf Herstellerseite nicht das Flag dynamicSchemaGeneration, das für reflektionsbasierte Writer gedacht ist, meist zum Testen, da es sich auf die Leistung auswirkt.

Da Ihre Spüle den richtigen Typ sehen kann, wie Sie es gepostet haben, ist es wichtig, dass Sie Ihren Typ auf der Spüle haben. Fügen Sie zum Beispiel den Typ für die Senke hinzu und kommentieren Sie die Methode mit dem richtigen Typ: EchRecord, die Ihnen den richtigen Typ geben wird.

Sie können einstellen, auch GenericRecord, um in der Lage zu sein, darauf zuzugreifen wie ein Objekt Container mit record.get(<propertyname>)

Verwandte Themen