Mein Ziel ist es, ein Flink Streaming-Programm zu haben, das die letzten NIDs behält, wo die ID aus einem Event extrahiert wird. Die Senke ist eine Cassandra-Filiale, so dass die Liste der IDs jederzeit abgerufen werden kann. Es ist wichtig, dass Cassandra sofort bei jedem Event aktualisiert wird.Wie bereinigt man Flink Stream-Status für inaktive Schlüssel?
Dies kann einfach mit mapWithState
(siehe Code unten) implementiert werden. Es gibt jedoch ein wichtiges Problem mit diesem Code. Der Zustand wird durch userid
kodiert. Einige Benutzer sind möglicherweise für einige Zeit aktiv und dann nie wieder. Worüber ich mir Sorgen mache, ist, dass der staatliche Speicher für immer wachsen wird.
Wie wird ein Zustand für inaktive Schlüssel bereinigt?
case class MyEvent(userId: Int, id: String)
env
.addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
.keyBy(_.userId)
.mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
val keepNIds = currentIds match {
case None => Seq(in.id)
case Some(cids) => (cids :+ in.id).takeRight(100)
}
((in.userId, keepNIds), Some(keepNIds))
}
.addSink { in: (Int, Seq[String]) =>
CassandraSink.appDatabase.idsTable.store(...)
}
Danke, jetzt habe ich 'RichProcessFunction' gefunden und kann Callbacks mit Code wie' ctx.timerService() registrieren. RegisterProcessingTimeTimer (expiryMillis) '. Wir möchten diese Daten für 30 Tage aufbewahren. Inzwischen wird es Milliarden zusätzlicher Rückrufe geben. Kann Flink diese massive Anzahl von Rückrufen bewältigen? –
Timer sind im Status gehalten und Checkpointed. Ich denke, es sollte möglich sein, aber ich empfehle, dies unter etwas Belastung zu testen. –
Ich habe einen Weg gefunden, die Anzahl der Timer auf 1 pro Taste zu begrenzen: 'processElement' startet nur dann einen Timer für neue Schlüssel und 'onTimer'-Zeitpläne, wenn der letzte und der aktuelle Status noch nicht abgelaufen sein müssen. Jetzt werden wir nur ein paar Dutzend Millionen aktive Timer bekommen. Ich hoffe, das wird funktionieren :) –