1

Das Problem

Ich bin die Umsetzung eine Micro als Event-Sourcing-Aggregat mit, die wiederum als Flink FlatMapFunction umgesetzt wird. In der Grundkonfiguration liest das Aggregat Ereignisse und Befehle aus zwei kafka-Themen. Dann schreibt es neue Ereignisse in das erste Thema und verarbeitet Ergebnisse in einem dritten Thema. Daher fungiert Kafka als Event Store. Hoffen, dass diese Zeichnung hilft:Wiederherstellen Zustand Konsistenz in Flink, wenn Kafka als EventStor

RPC Request        RPC Result 
    |             | 
    ~~~~> Commands-|    |---> Results ~~~~~~| 
       |-->Aggregate--| 
    ~> Input evs. -|    |---> output evs. ~~~ 
    |             | 
    ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~ 

Aufgrund der Tatsache, dass Kafka nicht checkpoined ist, Befehle möglicherweise wiederholt zweimal werden könnten, und es scheint, dass die Ausgabe Ereignisse auch zweimal das Thema geschrieben werden können.

Wie konnte der Staat in diesen Fällen mit wiederholten Nachrichten wiederhergestellt werden? Ist es möglich, dass das Aggregat weiß, wann seine Eingabeströme aktuell sind, um mit der Verarbeitung von Befehlen zu beginnen?

Meine Gedanken

ich mehrere Lösungen gedacht haben:

  1. Wenn Flink ein Rollback nicht bestätigte Ereignisse implementiert, könnte ein Sink umgesetzt werden, die den Strom von der Ereignisquelle versetzt bekommen. Beim Neustart würde diese Senke neuere als Offset-Ereignisse im kafka-Thema entfernen. Auf diese Weise würden KafkaSource und KafkaSink vom selben Builder generiert und dann der Topologie ausgesetzt. Diese Lösung hat ein großes Problem, da andere Dienste die neueren Ereignisse im Thema lesen und Inkonsistenzen verursachen können.

  2. Wenn das Entfernen von Ereignissen aus Flink in 2 nicht möglich ist, kann die zustandsbehaftete Quelle möglicherweise Ereignisse aus dem Offset lesen und versuchen, die wiederholten Ereignisse im Aggregat abzugleichen und sie zu löschen. Diese Optionen scheinen nicht robust zu sein, da es Situationen geben kann, in denen Patches nicht deterministisch sind und Fehler aufweisen, da sie für jedes Aggregat und jede Topologie neu überdacht werden sollten und keine Wiederherstellung garantieren würden (z. B. im Falle von aufeinanderfolgenden Neustarts). Daher ist dies eine schlechte Lösung.

  3. Ein anderer Ansatz ist dieser. Es wird eine spezielle KafkaSource mit zwei speziellen Wasserzeichen erstellt: Das erste, KafkaSourceStartedWatermark, wird immer beim Start der Quelle gesendet, um abhängige Betreiber zu benachrichtigen. Wenn dieses Wasserzeichen gesendet wird, zeichnet die Quelle intern den aktuellen Kafka-Offset auf. Die zweite, KafkaSourceUpToDateWatermark, wird von der Quelle gesendet, wenn der Offset erreicht ist. Diese Wasserzeichen würden die Topologie transparent durchlaufen. Der Operator sollte in der Lage sein, diese Wasserzeichen zu handhaben und eine spezielle WatermarkNotifyable-Schnittstelle zu implementieren. Dann kann das Aggregat RPC-Befehle puffern oder ablegen, bis es in jeder Eingabequelle auf dem neuesten Stand ist.

    interface WatermarkNotifiable { 
        void started(String watermarkId);//KafkaSourceStartedWatermark watermark 
        void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark 
    } 
    
  4. Wenn in 3 die Infrastruktur Umsetzung nicht möglich ist, die KafkaSource einen Konstruktor implementieren könnte ein spezielles Wasserzeichen Ereignis angeben, die an die Betreiber reisen konnte, aber dies würde erfordern, dass alle Operatoren hängen von diesen Wasserzeichen eine Wieder -Emitiert dann.

  5. Andere unterschiedliche Vorgehensweise besteht darin, keine Befehle älter als ein Kriterium zu verarbeiten. Zum Beispiel haben Befehle einen Eintrittszeitstempel. Wenn Zeit verwendet wird, ist die Zeitsynchronisation kritisch.

Verwandte Stackoverflow in Frage

  1. Using Kafka as a (CQRS) Eventstore. Good idea?
  2. Kafka - Know if Consumer is up to date
  3. Kafka & Flink duplicate messages on restart

Antwort

0

Erstellen Sie einen neuen Conmuter Operator-Typ. Das ist wie eine Quelle. Es besteht aus mehreren Quellen, die Ereignis- und Befehlsthemen repräsentieren. Es beginnt im "Wiederherstellungs" -Zustand. In diesem Zustand liest es aus den Ereignissen Themen bis zu ihren neuesten. Für die Befehle speichert oder löscht sie diese. Einmal auf den neuesten Stand gebracht, gilt es als wiederhergestellt und "öffnet" den Weg zu Befehlen. Es könnte separat als Quelle plus Operator implementiert werden.

FlinkKafkaProducerXX ist nicht genug, um dies zu tun, aber es wäre die Basis, um es zu implementieren.

Verwandte Themen