2017-09-09 2 views
0

In einer akka-http Websocket-App habe ich eine Route, die die angegebenen Nachrichten zurückgibt, und ich muss auch den Status in der App beibehalten. So funktioniert das folgende gut:Zurückgegebene Quellen mit statusfulMapConcat

Allerdings muss ich auch die Echonachrichten drosseln, so dass nur einer pro Sekunde passiert. So würde Ich mag Lage sein, dies zu tun:

def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].statefulMapConcat {() => 
    implicit var state = new SessionState() 
    msg: Message => 
    Source(List(msg, msg, msg)).throttle(1, 1 second, 1, ThrottleMode.shaping) 
    } 

jedoch die Funktion zurück von statefulMapConcat setzt voraus, dass es sich um ein Iterable sein. Gibt es eine Möglichkeit, stattdessen eine Source zurückgeben?

Antwort

1

Sie könnten flatMapConcat (oder flatMapMerge, wenn Sie eine Parallelisierung benötigen) und geben Sie eine Funktion, die eine Source für jedes eingehende Element erzeugt.

Jede produzierte Source kann gedrosselt werden, indem der throttle Kombinator angedockt wird, genau wie oben.

Wenn Sie möchten, dass Ihre Source Stateful sein soll, können Sie sie mit Source.unfold erstellen.

Beispiel unten (mit Anzahl der erzeugten Nachrichten als interner Zustand):

def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].flatMapConcat { msg: Message => 
    Source.unfold(0){ count: Int ⇒ 
     if (count < 3) 
     Some(count + 1, msg) 
     else 
     None 
    }.throttle(1, 1.second, 1, ThrottleMode.shaping) 
    } 
+0

Das ist genau das, was ich suchte. Vielen Dank! –