0

Ich versuche, ein Objekt an Kafka mit Avro Serializer und Schema-Registrierung zu senden.
Dies ist ein vereinfachter Code:So senden Sie eine Nachricht an Kafka mit Avro Serializer und Schema-Registrierung

Properties props = new Properties(); 
    ... 
    props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); 
    props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081"); 

    Producer<String, User> producer = new KafkaProducer(properties); 

    User user = new User("name", "address", 123); 
    ProducerRecord record = new ProducerRecord<>(topic, key, user); 
    producer.send(record); 

ich, dass das Schema „hinter den Kulissen“ liest aus der Registrierung wird davon ausgegangen, und das Objekt (Benutzer) serialisiert, aber ich bekomme die folgenden Fehler.
Was fehlt mir?
Muss ich das Schema explizit lesen und ein GenericRecord senden?

org.apache.kafka.common.errors.SerializationException: Fehler Serialisierung Avro Nachricht
Verursacht durch: java.lang.IllegalArgumentException: Nicht unterstützte Avro-Typ. Unterstützte Typen sind NULL, Boolean, Integer, Long, Float, Double, String, byte [] und IndexedRecord
unter io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema (AbstractKafkaAvroSerDe.java:123) ~ [kafka-avro-serializer -3.3.0.jar!/:?]
bei io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:73) ~ [kafka-avro-serializer-3.3.0.jar!/:?]
bei io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:53) ~ [kafka-Avro-Serializer-3.3.0.jar!/:?]
bei org.apache.kafka.clients. producer.KafkaProducer.send (KafkaProducer.java:424) ~ [kafka-clients-0.9.0.1.jar!/:?]

Antwort

1

Ihr Code scheint korrekt zu sein. Das einzige, was möglicherweise fehlt, ist, dass Ihr AVRO-Objekt mit einem AVRO-Plugin nicht richtig generiert wurde. Das bedeutet, dass Ihre Klasse SpecificRecords implementieren muss, die IndexedRecord implementiert.

+0

Richtig. Alles scheint zu funktionieren, nachdem ich eine richtige avsc-Datei erstellt und eine Java-Datei daraus erstellt habe, indem ich graply-avro-plugin verwendet habe. Ich nahm an, dass das Schema automatisch von Avro erstellt wird, und ich habe mich irrtümlich reflexiv verhalten. – msayag

Verwandte Themen