0

Ich verwende Kafka 0.10.2 und Avro für die Serialisierung meiner Nachrichten, sowohl für den Schlüssel als auch für die Wertdaten. Jetzt möchte ich Kafka Streams verwenden, aber ich bin stecken geblieben versuchen, die Serde Klasse für die GenericData.Record Klasse zu schreiben.Wie schreibe ich einen KafkaAvro Serde für GenericData.Record

import org.apache.avro.generic.GenericData.Record; 
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; 
import io.confluent.kafka.serializers.KafkaAvroDeserializer; 
import io.confluent.kafka.serializers.KafkaAvroSerializer; 
[...] 

public final class KafkaAvroSerde implements Serde<Record> { 

    private final Serde<Record> inner; 

    public KafkaAvroSerde() { 
     // Here I get the error 
     inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer()); 
    } 

    public KafkaAvroSerde(SchemaRegistryClient client) { 
     this(client, Collections.emptyMap()); 
    } 

    public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) { 
     // Here I get the error 
     inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props)); 
    } 

    @Override 
    public Serializer<Record> serializer() { 
     return inner.serializer(); 
    } 

    @Override 
    public Deserializer<Record> deserializer() { 
     return inner.deserializer(); 
    } 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) { 
     inner.serializer().configure(configs, isKey); 
     inner.deserializer().configure(configs, isKey); 
    } 

    @Override 
    public void close() { 
     inner.serializer().close(); 
     inner.deserializer().close(); 
    } 

} 

Dies ist der Fehler, den ich bin an den kommentierten Zeilen bekommen

Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record> 

Ich brauche die Serde Klasse für die GenericData.Record (und nicht für eine bestimmte POJO von mir) zu definieren, weil ich kann verschiedene Objektstrukturen auf dem gleichen Kanal, so sollte der Deserializer mir die GenericData zurückgeben (und ich werde die richtigen POJOs nach diesem Schritt bevölkern).

Wie würden Sie die Dinge erledigen? Danke

Antwort

2

Sie haben bereits gefragt, question in the Confluent mailing list. Hier ist die Zusammenfassung meiner Antwort, die ich dort gepostet habe.

Wir haben gerade die Arbeit an einer offiziellen Confluent Avro Serde (spezifische Avro + generische Avro) für Kafka Streams abgeschlossen. Siehe https://github.com/confluentinc/schema-registry/tree/master/avro-serde.

Der neue Avro Serde, der Confluent Schema Registry Aware/kompatibel ist, wird mit der bevorstehenden Confluent 3.3 veröffentlicht, die ein paar Wochen ist.

Bis 3.3 freigegeben wird, können Sie entweder Ihre eigenen Artefakte aus dem master Zweig bauen (Sie müssen zuerst die master Zweige confluentinc/common und confluentinc/rest-utils mit mvn install bauen, dann das Schema-Registrierungs Projekt mit mvn install) oder z.B. Kopieren Sie die Klassen in Ihr eigenes Codeprojekt.

Hinweis: Der Zweig master in den obigen und nachfolgenden Projekten sind Entwicklungszweige, d. H. Vorabzweige. Zukünftige Leser dieser Antwort sollten dies im Hinterkopf behalten.

Wir haben auch Beispiele, wie man den neuen, bevorstehenden Confluent Avro serde verwendet. Sie finden die Demos im master-Zweig von https://github.com/confluentinc/examples.

Einige direkte Links zu Avro bezogenen Beispielen und End-to-End durch Confluent vorgesehen Integrationstests (es gibt mehr solche Beispiele als die unten):

Verwandte Themen