2017-04-06 3 views
3

Ich habe eine Einzelinstanz-Java-Anwendung, die KTable von Kafka Streams verwendet. Bis vor kurzem konnte ich alle Daten mit KTable abrufen, als plötzlich einige der Nachrichten zu verschwinden schienen. Dort sollten ~ 33k Nachrichten mit eindeutigen Schlüsseln sein.Warum Kafka KTable Einträge fehlen?

Wenn ich Nachrichten per Schlüssel abrufen möchte, bekomme ich einige der Nachrichten nicht. Ich benutze ReadOnlyKeyValueStore Nachrichten abzurufen:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore()); 
store.get(key); 

Dies sind die Konfigurationseinstellungen I des KafkaStreams gesetzt.

final Properties config = new Properties(); 
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId); 
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); 
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 

Kafka: 0.10.2.0-CP1
Confluent: 3.2.0

Untersuchungen brachten mir einige sehr beunruhigende Einblicke. Mit REST Proxy habe ich Partitionen manuell gelesen und festgestellt, dass einige Offsets Fehler zurückgeben.

Anfrage: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{ 
    "error_code": 50002, 
    "message": "Kafka error: Fetch response contains an error code: 1" 
} 

Kein Client, weder Java noch Kommandozeile zurückkehrt jedoch einen Fehler. Sie überspringen einfach die fehlerhaften fehlenden Nachrichten, die zu fehlenden Daten in KTables führen. Alles war gut und ohne Vorwarnung scheint es, dass einige der Nachrichten korrupt wurden.

Ich habe zwei Broker und alle Themen haben den Replikationsfaktor von 2 und sind vollständig repliziert. Beide Broker geben dasselbe zurück. Neustarten von Maklern macht keinen Unterschied.

  • Was könnte die Ursache sein?
  • Wie erkennt man diesen Fall in einem Client?
+0

Ich habe keine Ahnung, was 'StoreManager' ist - das ist nicht Teil von Kafka Streams. Verwenden Sie gefensterte oder nicht gefensterte KTable? Welche Version von Kafka Streams verwenden Sie? –

+2

@ MatthiasJ.Sax Entschuldigung, mein Fehler Ich habe die Frage präzisiert. – Maciej

+0

Danke für das Update. Das klingt wirklich seltsam. "Sie überspringen einfach die fehlerhaften Nachrichten, was zu fehlenden Daten führt" - dieser Klang ist auch sehr seltsam - AFAIK, die Verbraucher haben keinen eingebauten Mechanismus für das "Überschreiten" von Nachrichten. Vielleicht solltest du in der Kafka-Benutzerliste http://kafka.apache.org/contact fragen (das könnte sogar ein Bug sein ...) - Es scheint jedoch keine Kafka Streams Probleme zu geben, da Kafka Streams intern nur Kafka Consumer benutzt - das, wenn sich der Consumer merkwürdig verhält, gibt es Kafka Streams nichts damit umgehen können. –

Antwort

1

von default Kafka Broker Konfigurationsschlüssel cleanup.policy auf delete gesetzt. Setzen Sie es auf compact, um die letzte Nachricht für jeden Schlüssel zu behalten. See compaction.

Das Löschen alter Nachrichten ändert nicht den minimalen Offset. Wenn Sie also versuchen, eine Nachricht darunter zu lesen, wird ein Fehler verursacht. Der Fehler ist sehr vage. Der Kafka Streams-Client beginnt Nachrichten von einem minimalen Offset zu lesen, so dass kein Fehler auftritt. Der einzige sichtbare Effekt sind fehlende Daten in KTables.

Während die Anwendung läuft dank der caches alle Daten möglicherweise noch verfügbar sein, auch nachdem Nachrichten von Kafka selbst gelöscht werden. Sie werden nach der Säuberung verschwinden.

Verwandte Themen