Hier ist der Code-Schnipsel aus akka documentation istSink fach für Akka Strom Source.actorRef Puffer und OverflowStrategy
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 3.seconds)
assert(result == "123")
Es ist ein Arbeits Code-Schnipsel, aber wenn ich ref verwenden wie ref ! 4
eine andere Nachricht zu sagen, ich habe eine Ausnahme wie akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)
Ich denke, dass Puffergröße 3 genug sein sollte. Der Grund dafür ist, dass der Faltungsbetrieb (acc, ele) => acc ist, so dass es einen Akkumulator und ein Element benötigt, um einen neuen Wert-Akkumulator zurückzugeben.
Also änderte ich den Code ließ einen anderen Schauspieler sagen, warten Sie 3 Sekunden. Und es funktioniert wieder.
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
Thread.sleep(3000)
ref ! 4
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 10.seconds)
println(result)
Allerdings ist meine Frage, ist es eine Möglichkeit, wir Akka Strom zu verlangsamen oder warten, bis das Waschbecken zur Verfügung stehen berichten. Ich benutze auch die OverflowStrategy.backpressure
, aber es sagte Backpressure overflowStrategy not supported
.
Weitere Optionen?
Danke für die Antwort. Können Sie auch das Code-Snippet bereitstellen? Ich bin etwas verwirrt über 'Enqueued' und' offer' Zeug von 'Source.queue' –
Hinzugefügt ein Beispiel –