Ich bin neu in der Streaming-Datenverarbeitung und habe, was ich fühle, muss ein sehr einfaches Anwendungsfall sein.Wie füge ich einen Cooldown/Rate-Limit zu einem Stream in Kafka Streams hinzu?
Nehmen wir an, ich habe einen Strom von (User, Alert)
Tupel. Ich möchte diesen Stream pro Benutzer begrenzen. I.e. Ich möchte einen Stream, der nur einmal für einen Benutzer eine Warnung ausgibt. Im folgenden sagen wir 60 Minuten, sollte jeder eingehende Alarm für den Benutzer einfach verschluckt werden. Nach diesen 60 Minuten sollte ein eingehender Alarm erneut ausgelöst werden.
Was ich versucht:
aggregate
als Stateful Transformation verwenden aber der Aggregatzustand zeitabhängig sein müssen. Doch obwohl die resultierende KTable
keine Veränderung der Gesamtwert hat, der K-Tabelle (als Changelog) weiterhin Elemente herabs, also nicht den gewünschten Effekt der „geschwindigkeitsbestimmend“ den Strom zu erreichen
val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print
das liefert die folgende Ausgabe:
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
Es ist im allgemeinen mir unklar, wie/wann aggregate
entscheidet nachgeschaltete Elemente zu veröffentlichen. Mein ursprüngliches Verständnis war, dass es sofort war, aber das scheint nicht der Fall zu sein. Windowing sollte hier nicht helfen, soweit ich sehen kann.
Ist es möglicherweise der Fall, dass der Kafka Streams DSL derzeit nicht für diesen Anwendungsfall der Stateful Transformation, ähnlich Sparks updateStateByKey oder Akkas statefulMapConcat? Muss ich die untergeordnete Prozessor-/Transformer-API verwenden?
EDIT:
Die Possible duplicate gehen nicht in die Frage, wie Rekord-Caching einige Verwirrung in Bezug auf die Ursache, wenn Aggregationen entscheiden nachgeschaltete Elemente zu veröffentlichen. Die primäre Frage war jedoch, wie man "Rate-Limiting" im DSL erreicht. Wie @miguno hervorhebt, müsste man zur Prozessor-API der unteren Ebene zurückkehren. Im Folgenden werde ich den Ansatz eingefügt, die sehr ausführlich ist:
val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")
case class StateRecord(alert: Alert, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)
class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}
Mögliche Duplikate von [Warum sehe ich keine Ausgabe von Kafka Streams Methode?] (Http://stackoverflow.com/questions/40537084/why-dont-i-see-any-output-from- the-kafka-streams-reduce-Methode) –
Nicht möglich in DSL im Moment. Die Verwendung von PAPI würde funktionieren. –
PAPI = Die Prozessor-API in Kafka Streams. –