Ich lese von einem Kafka-Thema, das Avro-Nachrichten enthält, die unter Verwendung der KafkaAvroEncoder
(die automatisch die Schemas mit den Themen registriert). Ich benutze das maven-avro-plugin, um einfache Java-Klassen zu erzeugen, die ich gerne beim Lesen benutzen würde.Deserialize Avro-Nachrichten in bestimmte Daten mit KafkaAvroDecoder
Die KafkaAvroDecoder
unterstützt nur die Deserialisierung in GenericData.Record
Typen, die (meiner Meinung nach) den ganzen Punkt einer statisch getippten Sprache verfehlt. Mein Deserialisierung Code sieht derzeit wie folgt aus:
SpecificDatumReader<event> reader = new SpecificDatumReader<>(
event.getClassSchema() // event is my class generated from the schema
);
byte[] in = ...; // my input bytes;
ByteBuffer stuff = ByteBuffer.wrap(in);
// the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored
// in the schema-registry) before the serialized message
if (stuff.get() != 0x0) {
return;
}
int id = stuff.getInt();
// lets just ignore those special bytes
int length = stuff.limit() - 4 - 1;
int start = stuff.position() + stuff.arrayOffset();
Decoder decoder = DecoderFactory.get().binaryDecoder(
stuff.array(), start, length, null
);
try {
event ev = reader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
ich meine Lösung umständlich gefunden, also würde ich gerne wissen, ob es eine einfachere Lösung ist, dies zu tun.
Haben Sie https://github.com/confluentinc/examples gefunden, insbesondere Beispiele wie https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io /confluent/examples/streams/SpecificAvroIntegrationTest.java? –
@miguno oh, nein, danke für den Hinweis – kosii