0

bei dem Versuch, Avro Daten mit Kafka Streams zu streamen, stieß ich auf diesen Fehler:Kafka Streams - Unbekannt Magie Byte auf GenericAvroSerde

Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Auch wenn ich einige ältere Threads darüber in der Mailing-Liste zu finden, Keine der dort genannten Lösungen behebt das Problem. Hoffentlich kann ich hier eine Lösung finden.

Mein Setup sieht wie folgt aus:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)

Ich habe bereits versucht als VALUE_SERDE die KEY_SERDE auf die gleiche Einstellung, aber auch wenn dies wurde „markiert“ als eine Lösung in der Mailing Liste, es hat in meinem Fall nicht funktioniert.

Ich Erzeugung GenericData.Record mit meinem Schema wie folgt:
val record = new GenericData.Record(schema) ... record.put(field, value)

Wenn ich den Debug-Modus und prüfen Sie den Rekord starten, ist alles gut aussieht, gibt es Daten in dem Datensatz und die Zuordnung korrekt ist.

ich streamen die KStream wie folgt aus (I-Zweig verwendet vorher):
splitTopics.get(0).to(s"${destTopic}_Testing")

I GenericData.Record für die Datensätze mit bin. Könnte das in Verbindung mit der GenericAvroSerde ein Problem sein?

Vielen Dank
Alles Gute und ein gutes neues Jahr!

+0

Was ist über insgesamt Setup? Einzelnes Thema? Können Sie die Daten mit dem Konsolenverbraucher lesen? Vielleicht hilft dieses Beispiel: https://github.com/conflutinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java –

+0

Eingabethema hat Rohtext (wird in Kafka-Streams analysiert) und dann auf 'GenericData.Record'-Datensätze gemappt. Das Ausgabethema enthält Avro-Daten. Ich kann die Nachrichten aus dem Eingabethema mit dem Konsolenverbraucher lesen. Wenn ich es im Debug-Modus starte, kann ich auch die Datensätze sehen, bevor sie gesendet werden, wobei der 'streams.to'-Aufruf Avro-Datensätze ist. Es ist ein einzelnes Eingabethema, aber mehrere Ausgabethemen (im Moment 4). Die Ausgabezweige existieren noch nicht. –

+0

Das magische Byte ist ein Byte, das vom konfluenten Client als eine Art Markierung vor einer serialisierten Nachricht mit Avro hinzugefügt wird.Dieser Fehler bedeutet möglicherweise, dass Sie versuchen, eine Nachricht mit Konfluent-Client zu deserialisieren, aber die Nachricht wurde nicht mit Avro von einem Confluent-Client serialisiert. Mischen Sie konfluente und normale Kafka-Clients? –

Antwort

0

Die Lösung für mein Problem bestand darin, die VALUE_SERDE nach dem Deserialisieren der String-Wert, den ich aus meinem Thema eingeben.

Seit Serde ist ein kombiniertes „Element“ von Serialisieren und Deserialisieren, ich kann einfach nicht StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde] verwenden, sondern ein StringSerde für Deserialisieren der Eingabedatensätze und nur dann ein AvroSerde verwenden, um es zu schreiben, um den Ausgangs Thema verwenden.
sieht wie folgt nun:

// default streams configuration serdes are different from the actual output configurations 
val streamsConfiguration: Properties = { 
    val p = new Properties() 
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID")) 
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG")) 
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG")) 
    p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 
    p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 
    p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG")) 
    p 
} 

// adjusted output serdes for avro records 
val keySerde: Serde[String] = Serdes.String 
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde() 
valSerde.configure(
    Collections.singletonMap(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
    streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG) 
), 
    /* isKeySerde = */ false 
) 

// Now using the adjusted serdes to write to output like this 
stream.to(keySerde, valSerde, "destTopic") 

Auf diese Weise funktioniert es wie Charme.
Vielen Dank

Verwandte Themen