2017-08-20 1 views
1

Ich brauche die Fähigkeit, alte Schlüssel aus dem Kartenzustand zu entfernen, die älter sind als ein fester Zeitraum. Ich behalte derzeit die Zeitstempel jedes Ereignisses in der Schlüsselstatuszuordnung, und ich möchte einen ansynchronen Prozess haben, der diese veralteten Schlüssel entfernt.Entfernen von abgelaufenen TTL-Schlüsseln in Flink Mapstate

Ich benutze RocksDB als State Backend, und ich glaube nicht, dass die Java API von RocksDB das Öffnen mit TTL unterstützt, wie notiert here.

Also meine Fragen sind:

  • Ist es überhaupt möglich, einen asynchronen Thread zu haben, die den Zugang zum Mapstate hat, da es in einer Operatorfunktion läuft?
  • Gibt es eine bessere Praxis in diesem Fall?

Vielen Dank im Voraus, für

Antwort

2

Ein einfacher Ansatz Zustand in Flink abläuft ist ein ProcessFunction Operator zu verwenden, um den Zustand zu halten. Sie können dann einen Timer verwenden (entweder einen Verarbeitungszeittimer oder einen Ereigniszeittimer, je nachdem, was für Ihre Anwendung sinnvoll ist) und den Status in der Methode onTimer löschen.

+0

Flink's Tabellen-API zeigt ein Beispiel, wie die Statusbereinigung aussehen kann: https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/ flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala – twalthr

+0

Hallo danke für deine schnellen Antworten. Ihre vorgeschlagene Lösung ist nicht schlecht, aber das Einzige, was mir fehlt, ist, dass die ProcessFunction den OnTimer-Callback nicht asynchron abfängt (habe ich recht?) – Eliran

+0

Für Event-Timer wird der OnTimer-Callback aufgerufen in demselben Thread aufgerufen, der Stream-Elemente verarbeitet, da er das Wasserzeichen verarbeitet, das den Timer auslöst Für Prozesszeit-Timer gibt es einen separaten Thread, der den Timer-Dienst implementiert. –