2017-03-21 2 views
2

Mein Ziel ist es, ein Flink Streaming-Programm zu haben, das die letzten NIDs behält, wo die ID aus einem Event extrahiert wird. Die Senke ist eine Cassandra-Filiale, so dass die Liste der IDs jederzeit abgerufen werden kann. Es ist wichtig, dass Cassandra sofort bei jedem Event aktualisiert wird.Wie bereinigt man Flink Stream-Status für inaktive Schlüssel?

Dies kann einfach mit mapWithState (siehe Code unten) implementiert werden. Es gibt jedoch ein wichtiges Problem mit diesem Code. Der Zustand wird durch userid kodiert. Einige Benutzer sind möglicherweise für einige Zeit aktiv und dann nie wieder. Worüber ich mir Sorgen mache, ist, dass der staatliche Speicher für immer wachsen wird.

Wie wird ein Zustand für inaktive Schlüssel bereinigt?

case class MyEvent(userId: Int, id: String) 

env 
    .addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties)) 
    .keyBy(_.userId) 
    .mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) => 
    val keepNIds = currentIds match { 
     case None => Seq(in.id) 
     case Some(cids) => (cids :+ in.id).takeRight(100) 
    } 
    ((in.userId, keepNIds), Some(keepNIds)) 
    } 
    .addSink { in: (Int, Seq[String]) => 
    CassandraSink.appDatabase.idsTable.store(...) 
    } 

Antwort

3

Der wachsende Zustand ist eine wichtige und richtige Beobachtung. Dies wird definitiv passieren, wenn sich der Schlüsselraum bewegt.

Flink 1.2.0 hat die ProcessFunction hinzugefügt, die dieses Problem anspricht. A ProcessFunction ähnelt einem FlatMapFunction, hat aber Zugriff auf Zeitgeberdienste. Sie können Timer registrieren, die die Callback-Funktion onTimer() aufrufen, wenn sie ablaufen. Der Rückruf kann dazu verwendet werden, den Zustand zu bereinigen.

+1

Danke, jetzt habe ich 'RichProcessFunction' gefunden und kann Callbacks mit Code wie' ctx.timerService() registrieren. RegisterProcessingTimeTimer (expiryMillis) '. Wir möchten diese Daten für 30 Tage aufbewahren. Inzwischen wird es Milliarden zusätzlicher Rückrufe geben. Kann Flink diese massive Anzahl von Rückrufen bewältigen? –

+0

Timer sind im Status gehalten und Checkpointed. Ich denke, es sollte möglich sein, aber ich empfehle, dies unter etwas Belastung zu testen. –

+1

Ich habe einen Weg gefunden, die Anzahl der Timer auf 1 pro Taste zu begrenzen: 'processElement' startet nur dann einen Timer für neue Schlüssel und 'onTimer'-Zeitpläne, wenn der letzte und der aktuelle Status noch nicht abgelaufen sein müssen. Jetzt werden wir nur ein paar Dutzend Millionen aktive Timer bekommen. Ich hoffe, das wird funktionieren :) –

Verwandte Themen