2016-07-28 5 views
0

Ich verwende einen benutzerdefinierten Kafka Deserializer, der ein Objekt aus JSON marshallt.Gibt es eine Möglichkeit, unbegrenzte Wiederholungen für benutzerdefinierte Kafka Deserializer zu stoppen?

val props = Map(
    "bootstrap.servers" -> kafkaQueue.kafkaHost, 
    "group.id" -> adapterServer.adapterConfig.kafkaGroup, 
    "enable.auto.commit" -> "false", 
    "max.poll.records" -> "1", 
    "auto.offset.reset" -> "earliest", 
    "auto.commit.interval.ms" -> "1000", 
    "key.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", 
    "value.deserializer" -> "com.mystuff.CustomJSONDeserializer" 
) 
new KafkaConsumer[Array[Byte], MyMessage](props) 

Eine Sache, die ich gesehen habe, ist, dass, wenn jemand schlecht JSON zum Thema postet, Kafka versucht es mit meinen benutzerdefinierten Deserializer deserialisieren - und kann es nicht. Der CustomJSONDeserializer löst eine Ausnahme aus, aber Kafka versucht es einfach weiter.

So dreht es sich einfach unendlich versuchen, die schlechte JSON-Version deserialisieren, im Wesentlichen stecken bleiben. Da dies alles innerhalb von Kafka geschieht, bin ich mir nicht sicher, wie ich es stoppen und ihm sagen kann, dass es mit der nächsten Nachricht weitergehen soll.

Wie kann ich dies vermeiden?

Antwort

0

Wenn Sie die Ausnahme abfangen, kann Ihre Anwendung nach der betroffenen Nachricht suchen, um dies zu vermeiden. Es gibt tatsächlich einen Fehler mit dem 0.10.0.0-Consumer, der masking deserialization errors war, also bin ich mir nicht sicher, ob frühere Versionen die Ausnahme richtig verbreiten. Wir werden mindestens Informationen über das Thema, die Partition und den Offset der Nachricht protokollieren, die den Fehler ab dem kommenden Release 0.10.0.1 verursacht hat.

Verwandte Themen