2016-05-11 2 views
2

Ich lese von einem Kafka-Thema, das Avro-Nachrichten enthält, die unter Verwendung der KafkaAvroEncoder (die automatisch die Schemas mit den Themen registriert). Ich benutze das maven-avro-plugin, um einfache Java-Klassen zu erzeugen, die ich gerne beim Lesen benutzen würde.Deserialize Avro-Nachrichten in bestimmte Daten mit KafkaAvroDecoder

Die KafkaAvroDecoder unterstützt nur die Deserialisierung in GenericData.Record Typen, die (meiner Meinung nach) den ganzen Punkt einer statisch getippten Sprache verfehlt. Mein Deserialisierung Code sieht derzeit wie folgt aus:

SpecificDatumReader<event> reader = new SpecificDatumReader<>(
     event.getClassSchema() // event is my class generated from the schema 
    ); 
    byte[] in = ...; // my input bytes; 
    ByteBuffer stuff = ByteBuffer.wrap(in); 
    // the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored 
    // in the schema-registry) before the serialized message 
    if (stuff.get() != 0x0) { 
     return; 
    } 
    int id = stuff.getInt(); 

    // lets just ignore those special bytes 
    int length = stuff.limit() - 4 - 1; 
    int start = stuff.position() + stuff.arrayOffset(); 

    Decoder decoder = DecoderFactory.get().binaryDecoder(
     stuff.array(), start, length, null 
    ); 
    try { 
     event ev = reader.read(null, decoder); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 

ich meine Lösung umständlich gefunden, also würde ich gerne wissen, ob es eine einfachere Lösung ist, dies zu tun.

+1

Haben Sie https://github.com/confluentinc/examples gefunden, insbesondere Beispiele wie https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io /confluent/examples/streams/SpecificAvroIntegrationTest.java? –

+0

@miguno oh, nein, danke für den Hinweis – kosii

Antwort

4

Dank dem Kommentar konnte ich die Antwort finden. Das Geheimnis mit einem Properties zu instanziiert KafkaAvroDecoder war die Verwendung des spezifischen Avro Lesers spezifiziert, das heißt:

Properties props = new Properties(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "..."); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C‌ONFIG, "..."); 
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    VerifiableProp vProps = new VerifiableProperties(props); 

    KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps); 
    MyLittleData data = (MyLittleData) decoder.fromBytes(input); 

Die gleiche Konfiguration für den Fall gilt direkt für die Verwendung der KafkaConsumer<K, V> Klasse (ich raubend von Kafka in Sturm mit dem KafkaSpout aus dem storm-kafka-projekt, das die SimpleConsumer verwendet, muss ich also die messages manuell deserialisieren.Für die mutigen gibt es das storm-kafka-client-projekt, das macht das automatisch mit dem neuen style consumer).

+0

Ich denke, Sie haben einen Tippfehler. 'props.put (KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG," ... ");' sollte 'props.put sein (AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG," ... ");' – zzztimbo

+1

@zzztimbo interessant, ich habe ein Projekt, wo 'KafkaAvroDeserializerConfig. SCHEMA_REGISTRY_URL_CO NFIG' funktioniert, aber nicht in anderen Projekten: o – kosii

+1

Ich habe meine Antwort aktualisiert, Ihr Weg scheint zuverlässiger zu sein – kosii