Ich versuche, die letzte Nachricht in einem Kafka-Thema mit ConsumerSeekAware.Message-Typ Liste der Avro-Objekte verfügbar zu lesen. Ich kann das erfolgreich machen. Aber während der Deserialisierung versagt es. Die Nachricht wurde mithilfe des Spring-Cloud-Stream-Kafka-Frameworks erstellt. Nachricht hat einen contentType .Consumer eine Avro-Nachricht mit Spring-Kafka von Feder-Cloud-Stream-Kafka-Binder
Ich weiß, dass die Avro-Nachricht wie unten deserialisiert werden kann.
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
Aber es funktioniert nicht. Es kann auf zwei Dinge zurückzuführen sein.
Die Nachricht ist eine Liste von AVRO-Objekten. Aber ich versuche, den DatamReader mit Avro-Schema zu erstellen. Aber ich habe versucht, Schema wie Schema.createArray (UserDTO.class) zu erstellen. Aber es funktioniert nicht.
Ich denke, der Inhaltstyp für Avro Nachricht zu erwarten ist application/Avro aber wenn die Nachricht von scs erzeugt wird, ist es
contentType=application/x-java-object;
ich durch die Implementierung org.apache.kafka.common.serialization.Deserializer
einen Deserializer zu schaffen versuchen und konstruieren die KafkaConsumerFactory
. Kann jemand helfen?