Ich verwende Kafka 0.10.2 und Avro für die Serialisierung meiner Nachrichten, sowohl für den Schlüssel als auch für die Wertdaten. Jetzt möchte ich Kafka Streams verwenden, aber ich bin stecken geblieben versuchen, die Serde
Klasse für die GenericData.Record
Klasse zu schreiben.Wie schreibe ich einen KafkaAvro Serde für GenericData.Record
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
private final Serde<Record> inner;
public KafkaAvroSerde() {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public KafkaAvroSerde(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
}
@Override
public Serializer<Record> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Record> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
Dies ist der Fehler, den ich bin an den kommentierten Zeilen bekommen
Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>
Ich brauche die Serde Klasse für die GenericData.Record
(und nicht für eine bestimmte POJO von mir) zu definieren, weil ich kann verschiedene Objektstrukturen auf dem gleichen Kanal, so sollte der Deserializer mir die GenericData
zurückgeben (und ich werde die richtigen POJOs nach diesem Schritt bevölkern).
Wie würden Sie die Dinge erledigen? Danke