2016-03-25 2 views
5

Ich habe einen Akka-Stream und ich möchte, dass der Stream ungefähr jede Sekunde Nachrichten downstream sendet.Wie kann man einen Akka Stream limitieren, um eine Nachricht nur einmal pro Sekunde auszuführen und zu senden?

Ich habe versucht, zwei Möglichkeiten, um dieses Problem zu lösen, war die erste Möglichkeit, den Erzeuger am Anfang des Streams nur Nachrichten senden jede Sekunde, wenn eine Continue-Nachrichten in diesem Akteur kommt.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

Diese für kurze arbeitet, während dann eine Flut Nachrichten weiter im ActorPublisher Schauspieler erscheinen, Ich gehe davon aus (Vermutung aber nicht sicher) von stromabwärts über Gegendruck Nachrichten als Downstream-anfordernden schnell verbrauchen kann aber der Upstream produziert nicht mit einer hohen Geschwindigkeit. Also diese Methode ist fehlgeschlagen.

Der andere Weg, den ich versuchte, war über Rückdruckkontrolle, ich benutzte MaxInFlightRequestStrategy auf dem ActorSubscriber am Ende des Stroms, um die Anzahl der Nachrichten auf 1 pro Sekunde zu begrenzen. Das funktioniert, aber Nachrichten, die hereinkommen, kommen zu ungefähr drei oder mehr gleichzeitig, nicht nur einzeln. Es scheint, dass die Rückdrucksteuerung nicht sofort die Rate der Nachrichten ändert, die in den OR kommen, Nachrichten waren bereits in dem Strom in der Warteschlange und warteten darauf, verarbeitet zu werden.

Das Problem ist also, wie kann ich einen Akka Stream haben, der nur eine Nachricht pro Sekunde verarbeiten kann?


Ich entdeckte, dass MaxInFlightRequestStrategy eine gültige Art und Weise, es zu tun, aber ich sollte die Losgröße 1 ist, seine Chargengröße wird als Standard eingestellt 5, die das Problem verursacht wurde ich gefunden. Es ist auch ein überkomplizierter Weg, das Problem zu lösen, jetzt da ich die eingereichte Antwort hier anschaue.

+2

Haben Sie darüber nachgedacht mit 'Source.tick'? – cmbaxter

+0

Nein, lassen Sie mich einen Blick darauf werfen, danke. – Phil

+0

können Sie auch 'Drosselung' versuchen. –

Antwort

10

Sie können entweder Ihre Elemente durch den Drosselungsfluss setzen, wodurch eine schnelle Druckquelle abgesichert wird, oder Sie können eine Kombination aus tick und zip verwenden.

Die Faust Lösung würde so aussehen:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val throttlingFlow = Flow[Long].throttle(
    // how many elements do you allow 
    elements = 1, 
    // in what unit of time 
    per = 1.second, 
    maximumBurst = 0, 
    // you can also set this to Enforcing, but then your 
    // stream will collapse if exceeding the number of elements/s 
    mode = ThrottleMode.Shaping 
) 

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println)) 

Die zweite Lösung so sein würde:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println)) 
Verwandte Themen