2017-02-01 2 views
1

Ich bin neu in der Streaming-Datenverarbeitung und habe, was ich fühle, muss ein sehr einfaches Anwendungsfall sein.Wie füge ich einen Cooldown/Rate-Limit zu einem Stream in Kafka Streams hinzu?

Nehmen wir an, ich habe einen Strom von (User, Alert) Tupel. Ich möchte diesen Stream pro Benutzer begrenzen. I.e. Ich möchte einen Stream, der nur einmal für einen Benutzer eine Warnung ausgibt. Im folgenden sagen wir 60 Minuten, sollte jeder eingehende Alarm für den Benutzer einfach verschluckt werden. Nach diesen 60 Minuten sollte ein eingehender Alarm erneut ausgelöst werden.

Was ich versucht:

aggregate als Stateful Transformation verwenden aber der Aggregatzustand zeitabhängig sein müssen. Doch obwohl die resultierende KTable keine Veränderung der Gesamtwert hat, der K-Tabelle (als Changelog) weiterhin Elemente herabs, also nicht den gewünschten Effekt der „geschwindigkeitsbestimmend“ den Strom zu erreichen

val fooStream: KStream[String, String] = builder.stream("foobar2") 
fooStream 
    .groupBy((key, string) => string) 
    .aggregate(() => "constant", 
    (aggKey: String, value: String, aggregate: String) => aggregate, 
    stringSerde, 
    "name") 
    .print 

das liefert die folgende Ausgabe:

[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null) 
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null) 

Es ist im allgemeinen mir unklar, wie/wann aggregate entscheidet nachgeschaltete Elemente zu veröffentlichen. Mein ursprüngliches Verständnis war, dass es sofort war, aber das scheint nicht der Fall zu sein. Windowing sollte hier nicht helfen, soweit ich sehen kann.

Ist es möglicherweise der Fall, dass der Kafka Streams DSL derzeit nicht für diesen Anwendungsfall der Stateful Transformation, ähnlich Sparks updateStateByKey oder Akkas statefulMapConcat? Muss ich die untergeordnete Prozessor-/Transformer-API verwenden?

EDIT:

Die Possible duplicate gehen nicht in die Frage, wie Rekord-Caching einige Verwirrung in Bezug auf die Ursache, wenn Aggregationen entscheiden nachgeschaltete Elemente zu veröffentlichen. Die primäre Frage war jedoch, wie man "Rate-Limiting" im DSL erreicht. Wie @miguno hervorhebt, müsste man zur Prozessor-API der unteren Ebene zurückkehren. Im Folgenden werde ich den Ansatz eingefügt, die sehr ausführlich ist:

val logConfig = new util.HashMap[String, String](); 
    // override min.insync.replicas 
    logConfig.put("min.insyc.replicas", "1") 

    case class StateRecord(alert: Alert, time: Long) 

    val countStore = Stores.create("Limiter") 
    .withKeys(integerSerde) 
    .withValues(new JsonSerde[StateRecord]) 
    .persistent() 
    .enableLogging(logConfig) 
    .build(); 
    builder.addStateStore(countStore) 

    class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] { 
    var context: ProcessorContext = null; 
    var store: KeyValueStore[Integer, StateRecord] = null; 

    override def init(context: ProcessorContext) = { 
     this.context = context 
     this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]] 
    } 

    override def transform(key: Integer, value: Alert) = { 
     val current = System.currentTimeMillis() 
     val newRecord = StateRecord(value._1, value._2, current) 
     store.get(key) match { 
     case StateRecord(_, time) if time + 15.seconds.toMillis < current => { 
      store.put(key, newRecord) 
      (key, value) 
     } 
     case StateRecord(_, _) => null 
     case null => { 
      store.put(key, newRecord) 
      (key, value) 
     } 
     } 
    } 
    } 
+0

Mögliche Duplikate von [Warum sehe ich keine Ausgabe von Kafka Streams Methode?] (Http://stackoverflow.com/questions/40537084/why-dont-i-see-any-output-from- the-kafka-streams-reduce-Methode) –

+0

Nicht möglich in DSL im Moment. Die Verwendung von PAPI würde funktionieren. –

+0

PAPI = Die Prozessor-API in Kafka Streams. –

Antwort

2

Lassen Sie uns sagen, dass ich einen Strom von (User, Alert) Tupel haben. Ich möchte diesen Stream pro Benutzer begrenzen. I.e. Ich möchte einen Stream, der nur einmal für einen Benutzer eine Warnung ausgibt. Im folgenden sagen wir 60 Minuten, sollte jeder eingehende Alarm für den Benutzer einfach verschluckt werden. Nach diesen 60 Minuten sollte ein eingehender Alarm erneut ausgelöst werden.

Dies ist derzeit nicht möglich, wenn die DSL von Kafka Streams verwendet wird. Stattdessen können Sie (und müssen) dieses Verhalten manuell mit der untergeordneten Prozessor-API implementieren.

FYI: Wir haben Diskussionen in der Kafka-Gemeinschaft darüber geführt, ob man solche Funktionen (oft als "Trigger" bezeichnet) zum DSL hinzufügt oder nicht. Bis jetzt war die Entscheidung, diese Funktionalität vorerst nicht zu haben.

Es ist mir im Allgemeinen unklar, wie/wann aggregate beschließt Elemente stromabwärts zu veröffentlichen. Mein ursprüngliches Verständnis war, dass es sofort war, aber das scheint nicht der Fall zu sein.

Ja, das war das ursprüngliche Verhalten für Kafka 0.10.0.0. Seitdem haben wir (nicht sicher, welche Version Sie verwenden) das Record-Caching eingeführt; Wenn Sie das Record-Caching deaktivieren, erhalten Sie das ursprüngliche Verhalten zurück, obwohl das Record-Caching Ihnen, wie ich weiß, einen (indirekten) Knopf für die Ratenbegrenzung gibt. Sie müssen also wahrscheinlich das Caching aktiviert lassen.

Leider decken die Apache Kafka-Dokumente noch nicht das Zwischenspeichern von Datensätzen ab, stattdessen sollten Sie stattdessen http://docs.confluent.io/current/streams/developer-guide.html#memory-management lesen.

+0

Vielen Dank für Ihre hilfreiche und zeitnahe Kommentar. Ich vermutete, dass dies derzeit im DSL nicht möglich ist. Hast du einen Link zu den Diskussionen, die die Community zu diesem Thema geführt hat? Während die Prozessor-API nett genug ist, scheint sie sicherlich ein häufiger Anwendungsfall zu sein, der abstrahiert werden kann (wie andere Frameworks). Ich aktualisierte die Frage mit meiner Problemumgehung – nambrot

+0

Diskussionen: Ich erinnere mich nicht von oben auf meinen Kopf. Höchstwahrscheinlich in einer Diskussion in der kafka-dev-Mailingliste. –

Verwandte Themen