2017-02-10 1 views
1

haben wir das folgende Problem mit Kafka Streams zu lösen:Kafka Streams DSL: aggregieren, anreichern und senden durch

1- erhalten Sie eine Nachricht. Jede Nachricht wird mit eventId (Nachrichtenaktualisierungsereignis) und correlationId (eindeutig für jede Nachricht) versehen.

2- Aggregat einig Zustände von dieser Nachricht (basierend auf eventId) und Anhänge an bereits bestehenden Zustand in lokalen Speichern

3- bereichert diese Nachricht für vollen aggregierten Zustand für dieses Ereignis und sendet es an Ausgang Thema durch

Punkt ist, wir können nicht wirklich eine einzelne Nachricht verlieren, und sie muss immer die eingehende Nachricht mit dem neuesten aggregierten Zustand anreichern (den wir tatsächlich während der Nachrichtenverarbeitung auswerten).

Von dem, was ich bisher gesehen habe wir kippen nur einfache Aggregation verwenden (so etwas wie die :)

stateMessageStream 
    .map((k, v) => new KeyValue[String, StateMessage](k, v)) 
    .mapValues[StateMessageWithMarkets](sm => {StateMessageWithMarkets(Some(sm), extract(sm))}) 
    .groupBy((k, _) => k, stringSerde, marketAggregatorSerde) 
    .aggregate[StateMessageWithMarkets](() => StateMessageWithMarkets(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName) 
    .to(stringSerde, marketAggregatorSerde, kafkaOutTopic) 

weil Aggregation in Abständen nur neue Datensätze erzeugt, und dass für zwei eingehende Nachrichten bedeuten würde, könnten wir erzeuge nur einzelne aggregierte Ausgabenachricht (so verloren wir eine Nachricht)

Mein zweiter Versuch, wie dies implementiert wurde, war im Grunde zwei Ströme, einer für die Aggregation, der zweite für einfache Nachrichten. Am Ende können wir zwei Ströme kommen wieder zusammen Operation unter Verwendung kommen, basierend auf correlationId als Schlüssel - dass wir den richtigen Zustand mit der richtigen Botschaft bieten können:

val aggregatedStream : KStream[String, MarketAggregator] = stateMessageStream 
    .map((k, v) => new KeyValue[String, StateMessage](k, v)) 
    .mapValues[StateMessage](v => { 
    log.debug("Received State Message, gameId: " + v.metadata().gtpId() + ", correlationId: " + v.correlationId) 
    v}) 
    .mapValues[MarketAggregator](sm => {MarketAggregator(sm.correlationId, extract(sm))}) 
    .groupBy((k, v) => k, stringSerde, marketAggregatorSerde) 
    .aggregate[MarketAggregator](() => MarketAggregator(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName) 
    .toStream((k, v) => v.correlationId) 

stateMessageStream 
    .selectKey[String]((k, v) => v.correlationId) 
    .leftJoin[MarketAggregator, StateMessageWithMarkets](aggregatedStream, (stateMessage : StateMessage, aggregatedState : MarketAggregator) => StateMessageWithMarkets(Some(stateMessage), aggregatedState.modelMarkets, stateMessage.correlationId), 
     JoinWindows.of(10000), 
     stringSerde, stateMessageSerde, marketAggregatorSerde) 
    .mapValues[StateMessageWithMarkets](v => { 
     log.debug("Producing aggregated State Message, gameId: " + v.stateMessage.map(_.metadata().gtpId()).getOrElse("unknown") + 
      ", correlationId: " + v.stateMessage.map(_.correlationId).getOrElse("unknown")) 
      v 
     }) 
    .to(stringSerde, stateMessageWithMarketsSerde, kafkaOutTopic) 

jedoch, dass scheint nicht zu funktionieren - Für zwei eingehende Nachrichten erhalte ich immer noch eine einzelne Nachricht mit dem neuesten aggregierten Status für das Ausgabethema.

Kann jemand bitte erklären, warum und was die richtige Lösung wäre?

Antwort

Verwandte Themen