2017-08-30 1 views
0

Hier ist der Code-Schnipsel aus akka documentation istSink fach für Akka Strom Source.actorRef Puffer und OverflowStrategy

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail) 
    .toMat(sinkUnderTest)(Keep.both).run() 

ref ! 1 
ref ! 2 
ref ! 3 
ref ! akka.actor.Status.Success("done") 

val result = Await.result(future, 3.seconds) 
assert(result == "123") 

Es ist ein Arbeits Code-Schnipsel, aber wenn ich ref verwenden wie ref ! 4 eine andere Nachricht zu sagen, ich habe eine Ausnahme wie akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)

Ich denke, dass Puffergröße 3 genug sein sollte. Der Grund dafür ist, dass der Faltungsbetrieb (acc, ele) => acc ist, so dass es einen Akkumulator und ein Element benötigt, um einen neuen Wert-Akkumulator zurückzugeben.

Also änderte ich den Code ließ einen anderen Schauspieler sagen, warten Sie 3 Sekunden. Und es funktioniert wieder.

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

    private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run() 

    ref ! 1 
    ref ! 2 
    ref ! 3 
    Thread.sleep(3000) 
    ref ! 4 
    ref ! akka.actor.Status.Success("done") 

    val result = Await.result(future, 10.seconds) 

    println(result) 

Allerdings ist meine Frage, ist es eine Möglichkeit, wir Akka Strom zu verlangsamen oder warten, bis das Waschbecken zur Verfügung stehen berichten. Ich benutze auch die OverflowStrategy.backpressure, aber es sagte Backpressure overflowStrategy not supported.

Weitere Optionen?

Antwort

4

Sie sollten in Source.queue als eine Möglichkeit suchen, Elemente in den Strom von außen in einer Gegendruck-bewusst Weise zu injizieren.

Source.queue wird in ein Warteschlangenobjekt umgewandelt, dem Sie Elemente anbieten können. Wenn Sie diese jedoch anbieten, erhalten Sie eine Future, die abgeschlossen wird, wenn der Stream bereit ist, die Nachricht zu akzeptieren.

Beispiel unten:

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

    val (queue, future): (SourceQueueWithComplete[Int], Future[String]) = 
    Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run() 

    Future.sequence(Seq(
    queue.offer(1), 
    queue.offer(2), 
    queue.offer(3), 
    queue.offer(4) 
)) 

    queue.complete() 

    val result = Await.result(future, 10.seconds) 

    println(result) 

Mehr Informationen im docs.

+0

Danke für die Antwort. Können Sie auch das Code-Snippet bereitstellen? Ich bin etwas verwirrt über 'Enqueued' und' offer' Zeug von 'Source.queue' –

+0

Hinzugefügt ein Beispiel –

Verwandte Themen