2016-12-21 3 views
0

Bei der Überprüfung Beispielen, die ich eine Menge sehen:Apache Flink lesen Avro byte [] von Kafka

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); 

ich sehe, dass sie hier das Schema bereits kennen.

Ich kenne das Schema nicht, bis ich das Byte [] in einen generischen Datensatz gelesen habe, dann das Schema bekommen. (Wie es von Rekord zu Rekord ändern kann)

Kann mir jemand Punkt in eine FlinkKafkaConsumer08, die von byte[] in einer Karte Filter liest, so dass ich einige führende Bits entfernen können, laden dann die byte[] in eine generische Bilanz?

Antwort

0

ich etwas ähnliches tun bin (ich bin mit dem 09 Verbraucher)

In Ihrem Haupt-Code Pass in Ihrem benutzerdefinierten Deserializer:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
       parameterTool.getRequired("topic"), new MyDeserializationSchema<>(), 
       parameterTool.getProperties()); 

Das benutzerdefinierte Deserialisierung Schema liest den Bytes, Figuren aus Das Schema und/oder ruft es aus einer Schemaregistrierung ab, deserialisiert in ein GenericRecord und gibt das GenericRecord-Objekt zurück.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> { 


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class; 

    @Override 
    public T deserialize(byte[] arg0) throws IOException { 
     //do your stuff here, strip off your bytes 
     //deserialize and create your GenericRecord 
     return (T) (myavroevent); 
    } 

    @Override 
    public boolean isEndOfStream(T nextElement) { 
     return false; 
    } 

    @Override 
    public TypeInformation<T> getProducedType() { 
     return TypeExtractor.getForClass(avrotype); 
    } 

} 
+0

Wow direkt aus der Box funktioniert. Danke, es ist jetzt offensichtlich, dass ich mir das anschaue. – Don

0

Wenn Sie Confluent Schema Registrierung verwenden, glaube ich, eine bevorzugte Lösung, um den Avro serde von Confluent zu nutzen wäre. Auf diese Weise rufen wir einfach deserialize() an und die Auflösung der neuesten Version des Avro-Schemas zur Verwendung wird automatisch hinter der Szene durchgeführt und es ist keine Byte-Manipulation erforderlich.

Es läuft darauf hinaus, so etwas wie diese (Beispiel-Code in scala, wäre eine Java-Lösung sehr ähnlich sein):

import io.confluent.kafka.serializers.KafkaAvroDeserializer 

... 

val valueDeserializer = new KafkaAvroDeserializer() 
valueDeserializer.configure(
    Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
    false) 

... 

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
         topic: String, partition: Int, offset: Long): KafkaKV = { 

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord] 
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord] 

    KafkaKV(key, value) 
    } 

... 

Dieses Verfahren erfordert, dass die Nachricht Produzent auch mit dem Schema Register integriert und veröffentlicht die Schema dort. Dies kann, wie oben in sehr ähnlicher Weise durchgeführt werden, unter Verwendung von Confluent des KafkaAvroSerializer

ich eine detaillierte Erklärung hier gepostet: How to integrate Flink with Confluent's schema registry