2017-12-09 5 views
0

Ich benutze Apache Kafka Streaming, um Aggregation von Daten zu machen, die von einem Kafka-Thema konsumiert werden. Die Aggregation wird dann zu einem anderen Thema serialisiert, selbst verbraucht und Ergebnisse in einer Datenbank gespeichert. Ziemlich klassischer Anwendungsfall, nehme ich an.Apache Kafka Streaming KTable changelog

Das Ergebnis des Aggregataufrufs ist das Erstellen einer KTable, die von einem Kafka Changelog "topic" unterstützt wird.

Dies ist komplexer als das in der Praxis, aber lassen Sie uns sagen, dass es für einen bestimmten Schlüssel die Zählung und die Summe der Ereignisse speichert (Mittelwert berechnen):

KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...) 

Das Changelog „Thema“ scheint nicht eine Aufbewahrungsfrist festlegen (ich sehe nicht, dass sie "abläuft", im Gegensatz zu den anderen Themen gemäß meiner globalen Aufbewahrungseinstellung).

Das ist eigentlich gut/notwendig, weil dadurch vermieden wird, meinen Aggregationsstatus zu verlieren, wenn ein zukünftiges Ereignis mit demselben Schlüssel kommt.

Aber auf lange Sicht bedeutet dies, dass diese Änderung für immer wachsen wird (wie mehr Schlüssel werden)? Und ich habe möglicherweise viele Schlüssel (und meine Aggregation ist nicht so klein wie count/sum).

Da ich ein Mittel habe zu wissen, dass ich keine Ereignisse eines bestimmten Schlüssels mehr bekommen werde (einige Ereignisse werden als "endgültig" markiert), gibt es eine Möglichkeit für mich, die Aggregationszustände für diese speziellen Schlüssel zu entfernen das changelog, um zu vermeiden, dass es für immer wächst, weil ich sie nicht mehr brauche, möglicherweise mit einer leichten Verzögerung "nur" für den Fall?

Oder gibt es eine Möglichkeit, dies völlig anders mit Kafka Streaming zu tun, um dieses "Problem" zu vermeiden?

+0

Ich habe gerade über Tombstone-Nachrichten gelesen, Schlüssel wird eine Null-Nachricht, die mir erlauben, diese fallen lassen. Ich muss noch testen. Und immer noch daran interessiert, was das richtige Muster wäre. – Christophe

+0

Ja: Changelog-Zweige werden mit der Protokollkomprimierung und nicht mit der Aufbewahrungszeit konfiguriert. Wenn Sie den "letzten" Datensatz erhalten, kann Ihre Aggregation nur "null" als Aggregationsergebnis zurückgeben. Dadurch wird es aus dem lokalen RocksDB-Speicher sowie dem zugrunde liegenden Changelog-Thema gelöscht. –

+1

Danke Matthias, ich habe getestet und bestätige alles geht wie erwartet mit Null zurück, wenn ich den "letzten" Rekord erreiche. – Christophe

Antwort

1

Ja: Changelog-Zweige werden mit Protokollkomprimierung und nicht mit Beibehaltungszeit konfiguriert. Wenn Sie den "letzten" Datensatz erhalten, kann Ihre Aggregation einfach null als Aggregationsergebnis zurückgeben. Dadurch wird es aus dem lokalen RocksDB-Speicher sowie dem zugrunde liegenden Changelog-Thema gelöscht.

Verwandte Themen