1

Ich versuche, die letzte Nachricht in einem Kafka-Thema mit ConsumerSeekAware.Message-Typ Liste der Avro-Objekte verfügbar zu lesen. Ich kann das erfolgreich machen. Aber während der Deserialisierung versagt es. Die Nachricht wurde mithilfe des Spring-Cloud-Stream-Kafka-Frameworks erstellt. Nachricht hat einen contentType .Consumer eine Avro-Nachricht mit Spring-Kafka von Feder-Cloud-Stream-Kafka-Binder

Ich weiß, dass die Avro-Nachricht wie unten deserialisiert werden kann.

DatumReader<GenericRecord> datumReader = 
      new SpecificDatumReader<>(targetType.newInstance().getSchema()); 
     Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); 

     result = (T) datumReader.read(null, decoder); 

Aber es funktioniert nicht. Es kann auf zwei Dinge zurückzuführen sein.

  1. Die Nachricht ist eine Liste von AVRO-Objekten. Aber ich versuche, den DatamReader mit Avro-Schema zu erstellen. Aber ich habe versucht, Schema wie Schema.createArray (UserDTO.class) zu erstellen. Aber es funktioniert nicht.

  2. Ich denke, der Inhaltstyp für Avro Nachricht zu erwarten ist application/Avro aber wenn die Nachricht von scs erzeugt wird, ist es contentType=application/x-java-object;

ich durch die Implementierung org.apache.kafka.common.serialization.Deserializer einen Deserializer zu schaffen versuchen und konstruieren die KafkaConsumerFactory . Kann jemand helfen?

Antwort

0

Siehe Eigenschaft ...producer.useNativeEncoding wie in der Producer Properties gezeigt.

useNativeEncoding

Wenn auf wahr gesetzt ist, wird die ausgehende Nachricht direkt von Client-Bibliothek serialisiert, die entsprechend konfiguriert werden müssen (beispielsweise einen geeigneten Kafka Erzeugerwert Serializer Einstellung). Wenn diese Konfiguration verwendet wird, basiert das Marshalling für ausgehende Nachrichten nicht auf dem contentType der Bindung. Wenn eine native Codierung verwendet wird, liegt es in der Verantwortung des Verbrauchers, einen geeigneten Decodierer (z. B. Kafka-Verbraucher-Wert-Deserialisierer) zu verwenden, um die eingehende Nachricht zu deserialisieren. Wenn die systemeigene Codierung/Decodierung verwendet wird, wird außerdem die Eigenschaft headerMode ignoriert, und die Header werden nicht in die Nachricht eingebettet.

Vorgabe: false.