2017-03-28 2 views
0

Ich möchte einen potenziell sehr großen Zustand aus einem Strom von Ereignissen projizieren. Dies ist, wie ich dies in einer imperativen Art und Weise implementieren könnte:Apache Flink - Implementieren eines Stream-Prozessors mit möglicherweise sehr großen Zustand

class ImperativeFooProcessor { 

    val state: mutable.Map[UUID, BarState] = mutable.HashMap.empty[UUID, BarState] 

    def handle(event: InputEvent) = { 
    event match { 
     case FooAdded(fooId, barId) => { 
     // retrieve relevant state and do some work on it 
     val barState = state(barId) 

     // let the world know about what may have happened 
     publish(BarOccured(fooId, barId)) 
     // or maybe rather 
     publish(BazOccured(fooId, barId)) 
     } 
     case FooRemoved(fooId, barId) => { 
     // retrieve relevant state and do some work on it 
     val barState = state(barId) 

     // let the world know about what may have happened 
     publish(BarOccured(fooId, barId)) 
     // or maybe rather 
     publish(BazOccured(fooId, barId)) 
     } 
    } 
    } 

    private def publish(event: OutputEvent): Unit = { 
    // push event to downstream sink 
    } 
} 

Im schlimmsten Fall wird die Größe der BarState wird mit der Anzahl der Zeiten wachsen seine von FooAdded

Die Anzahl der eindeutigen BarID ist erwähnt worden ist sehr klein relativ zur Gesamtzahl der Ereignisse für jede barId.

Wie würde ich beginnen, diese Verarbeitungsstruktur in Flink darzustellen?

Wie arbeite ich mit der Tatsache, dass jeder BarState potenziell sehr groß werden kann?

Antwort

1

Flink behält den Zustand in sogenannten Zustands-Backends bei. Es gibt Zustands-Back-Ends (MemoryStateBackend und FsStateBackend), die auf dem JVM-Heap der Worker-Prozesse betrieben wurden. Diese Backends sind nicht geeignet, um große Zustände zu verarbeiten.

Flink verfügt auch über ein RocksDBStateBackend, das auf RocksDB basiert. RocksDB wird als lokale Datenbank (keine Notwendigkeit, es als externen Dienst einzurichten) auf jedem Worker-Knoten verwendet und schreibt Zustandsdaten auf den Datenträger. Daher kann es einen sehr großen Zustand bewältigen, der den Speicher übersteigt.

Flink bietet eine KeyedStream das ist ein Stream, der auf ein bestimmtes Attribut partitioniert ist. In Ihrem Fall möchten Sie wahrscheinlich, dass alle Zugriffe auf dieselbe ID auf die gleiche Statusinstanz übertragen werden. Daher würden Sie barId als Schlüssel verwenden. Anschließend wird der Status basierend auf barId auf alle parallelen Worker-Threads partitioniert. Dies ist im Grunde ein verteilter Schlüsselwertspeicher oder eine Karte. Sie müssten also den Zustand nicht als Karte darstellen, da er von Flink automatisch verteilt wird.