2017-06-28 1 views
0

Ich habe ein Problem, dass Quelle sendet eine GenericMessage [Nutzlast = xxxxx, ...], während Senke die Nachricht als 10,120,120,120,120,120 empfängt.Spring Cloud Stream mit Avro kann String-Nachricht nicht korrekt konvertieren

Dieses Problem trat auf, nachdem ich Avro-Nachrichtenkonverter eingerichtet hatte. Wenn ich Avro-Nachrichtenkonverter entferne und StreamListener für die Nachrichtenkonvertierung verwende, funktioniert das problemlos.

Quelle application.properties

spring.cloud.stream.bindings.toGreeting.destination=greeting 
spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro 
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true 

Sink Anwendung

server.port=8990 
spring.cloud.stream.bindings.greeting.destination=greeting 

Nachricht Converter

@Configuration 
@EnableSchemaRegistryClient 
public class MessageConverterConfig { 
    @Bean 
    public MessageConverter topic1MessageConverter() throws IOException { 
     return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); 
    } 
} 

Anwendungsklasse

@SpringBootApplication 
@EnableSchemaRegistryClient 
public class SourceApplication { 
    public static void main(String[] args) { 
     SpringApplication.run(SourceApplication.class, args); 
    } 
} 

@EnableSchemaRegistryServer 
@EnableSchemaRegistryClient 
@SpringBootApplication 
public class SinkApplication { 
    public static void main(String[] args) { 
     SpringApplication.run(SinkApplication.class, args); 
    } 
} 

Fehle ich Konfigurationen? Danke.

Antwort

2

Hier ist eine einfache Regel:

Wenn Sie nur einen Nachrichtenwandler haben wollen, die aus Avro können serialisiert/deserialisiert, und Sie entweder die Schemaposition während der Konfiguration für GenericRecords bereitstellen oder Ihre StreamListener Methode hat eine Signatur eines Art von SpecificRecord. Wählen Sie dann AvroSchemaMessageConverter, richten Sie es wie Sie, aber verwenden Sie stattdessen avro/bytes. Wir reservieren die application/*+avro für Schema-Evolution-Unterstützung.

Also, wenn Sie @EnableSchemaRegistryClient setzen, dann delegieren Sie in eine externe Registrierung, um Ihre Schemas zu haben. In diesem Fall benötigen Sie nicht nur die Registrierung, sondern auch die dort registrierten Schemas.

Standardmäßig werden Hersteller automatisch alle Nutzdaten vom Typ SpecificRecord/GenericRecord oder Pojos registrieren, wenn spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled aktiviert ist.

In diesem Fall wird der Produzent tatsächlich die Überschrift auf etwas wie application/vnd.user.v1+avro setzen. Angenommen, Ihr Thema ist Benutzer und es ist die erste Version.

Downstream, wenn Ihre Kunden auch mit application/*+avro contentType konfiguriert sind, können sie diesen contentType lesen und den Betreff/die Version für die Abfrage des Schema-Servers und das Abrufen des entsprechenden Schemas ableiten.

1

Wenn Sie spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro Einstellung werden dann die AvroSchemaRegistryClientMessageConverter (die von SCST konfiguriert wird) verwendet werden muss, und Sie müssen nicht setzen Sie den explicit Konverter topic1MessageConverter für die Mimetype avro/bytes. Wenn Sie diesen Konverter verwenden möchten, müssten Sie spring.cloud.stream.bindings.toGreeting.contentType=avro/bytes setzen.

Verwandte Themen