2016-08-15 5 views
1

ich Kafka Consumer haben, derzeit konfiguriert mit:Deserialisieren Java-Objekte von Kafka Verbraucher

kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 

Aber was ich wirklich will, eine Kryo Deserializer Lage sein, stattdessen zu verwenden:

public class KryoPOJODeserializer<T> implements Deserializer<T> { 

    private Kryo kryo = new Kryo(); 

    @Override 
    public void configure(Map props, boolean isKey) { 
     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); 
     kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); 
    } 

    @Override 
    public T deserialize(String topic, byte[] data) { 
     // Deserialize the serialized object. 
     return kryo.readObject(new Input(data), T.class); 
    } 

    @Override 
    public void close() { 

    } 

} 

Was Ich kann nicht herausfinden, ist es möglich, den gleichen Verbraucher für verschiedene Themen zu verwenden (jedes Thema hat eine andere Art von POJO drauf)? Wenn meine Consumer-Konfiguration lautet:

kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName()); 

Oder muss ich einen separaten Consumer für jedes Thema haben?

Oder muss ich den generischen Teil meines Deserializers entfernen, immer ein Objekt zurückgeben und das Objekt in den entsprechenden POJO im Client-Code umwandeln? Etwas wie:

public class KryoPOJODeserializer implements Deserializer { 

    private Kryo kryo = new Kryo(); 

    @Override 
    public void configure(Map props, boolean isKey) { 
     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); 
     kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); 
    } 

    @Override 
    public Object deserialize(String topic, byte[] data) { 
     // Deserialize the serialized object. 
     return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data))); 
    } 

    @Override 
    public void close() { 

    } 

} 

Letzteres würde funktionieren, aber es fühlt sich ein bisschen schmutzig an.

Irgendwelche Vorschläge geschätzt!

Antwort

3

Sie können Sie ursprünglichen Ansatz verwenden, indem Deserializer Instanzen direkt in den Consumer vorbei:

KafkaConsumer<String, Foo> consumer = new KafkaConsumer<>(properties, 
    new StringDeserializer(), new KryoPOJODeserializer(Foo.class)); 

Wenn Sie die gleiche eingehende Datentyp für eine Reihe von Themen wieder verwenden möchten, dann können Sie ein Abonnement für diejenigen einzurichten Themen mit einem einzelnen Verbraucher. Wenn Sie unterschiedliche Objekttypen für die Werte verwenden möchten, müssen Sie mehrere Konsumenten verwenden.

Andernfalls ist auch Ihr zweiter Ansatz gültig.

+0

Also ich denke, es ist akzeptabel (im schlimmsten Fall) so viele "KafkaConsumer" Verbraucher wie ich Kafka Themen haben? Ich hatte irgendwie im Kopf, dass ein Kafka Consumer ziemlich schwergewichtig war, etwas wie eine Hibernate SessionFactory. Wenn das nicht der Fall ist, macht das Leben viel einfacher! – Matt

+1

Ja, das ist akzeptabel - Verbraucher (unabhängig von der Messaging-Technologie) werden normalerweise für die Handhabung eines bestimmten Payload-Typs geschrieben, obwohl es immer Ausnahmen gibt. Ob ein Kafka Consumer "schwer" ist, hängt davon ab, was Sie meinen. Ein Konsument wird TCP-Verbindungen zu allen Brokern aufrechterhalten, von denen er Nachrichten konsumiert. Ansonsten gibt es keine Threads im Hintergrund. –

Verwandte Themen