Wie in der documentation beschrieben, ist die Möglichkeit, ein Diagramm von außerhalb des Diagramms zu vervollständigen mit KillSwitch
. Das Beispiel, das Sie aus der Dokumentation kopiert haben, ist kein guter Kandidat, um diesen Ansatz zu veranschaulichen, da die Quelle nur ein einzelnes Element ist und der Stream beim Ausführen sehr schnell abgeschlossen wird. Lassen Sie uns die Grafik anpassen mehr auf einfache Weise die KillSwitch
in Aktion sehen:
val topSink = Sink.foreach(println)
val bottomSink = Sink.foreach(println)
val sharedDoubler = Flow[Int].map(_ * 2)
val killSwitch = KillSwitches.single[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) {
implicit builder => (topS, bottomS, switch) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topS.in
broadcast.out(1) ~> sharedDoubler ~> bottomS.in
ClosedShape
})
val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch)
Thread.sleep(1000)
res._3.shutdown()
jetzt Die Quelle von einer Million Elementen besteht, und die Senken nun die gesendeten Elemente drucken. Der Stream läuft für eine Sekunde, was nicht genug Zeit ist, um alle eine Million Elemente zu durchlaufen, bevor wir shutdown
aufrufen, um den Stream zu vervollständigen.
Wenn Sie einen Stream innerhalb eines Aktors ausführen, hängt der Lebenszyklus des zugrunde liegenden Aktors (oder der Akteure), der zum Ausführen des Streams erstellt wird, vom Lebenszyklus des einschließenden Aktors ab, abhängig davon, wie der Materializer erstellt wird. Lesen Sie die documentation für weitere Informationen. Der folgende Blogpost von Colin Breck über die Verwendung eines Schauspielers und KillSwitch
zur Verwaltung des Lebenszyklus eines Streams ist ebenfalls hilfreich: http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/