ich Kafka Consumer haben, derzeit konfiguriert mit:Deserialisieren Java-Objekte von Kafka Verbraucher
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Aber was ich wirklich will, eine Kryo Deserializer Lage sein, stattdessen zu verwenden:
public class KryoPOJODeserializer<T> implements Deserializer<T> {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
}
@Override
public T deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readObject(new Input(data), T.class);
}
@Override
public void close() {
}
}
Was Ich kann nicht herausfinden, ist es möglich, den gleichen Verbraucher für verschiedene Themen zu verwenden (jedes Thema hat eine andere Art von POJO drauf)? Wenn meine Consumer-Konfiguration lautet:
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName());
Oder muss ich einen separaten Consumer für jedes Thema haben?
Oder muss ich den generischen Teil meines Deserializers entfernen, immer ein Objekt zurückgeben und das Objekt in den entsprechenden POJO im Client-Code umwandeln? Etwas wie:
public class KryoPOJODeserializer implements Deserializer {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
}
@Override
public Object deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data)));
}
@Override
public void close() {
}
}
Letzteres würde funktionieren, aber es fühlt sich ein bisschen schmutzig an.
Irgendwelche Vorschläge geschätzt!
Also ich denke, es ist akzeptabel (im schlimmsten Fall) so viele "KafkaConsumer" Verbraucher wie ich Kafka Themen haben? Ich hatte irgendwie im Kopf, dass ein Kafka Consumer ziemlich schwergewichtig war, etwas wie eine Hibernate SessionFactory. Wenn das nicht der Fall ist, macht das Leben viel einfacher! – Matt
Ja, das ist akzeptabel - Verbraucher (unabhängig von der Messaging-Technologie) werden normalerweise für die Handhabung eines bestimmten Payload-Typs geschrieben, obwohl es immer Ausnahmen gibt. Ob ein Kafka Consumer "schwer" ist, hängt davon ab, was Sie meinen. Ein Konsument wird TCP-Verbindungen zu allen Brokern aufrechterhalten, von denen er Nachrichten konsumiert. Ansonsten gibt es keine Threads im Hintergrund. –