2017-11-01 9 views
0

Erhalten meiner ersten Schritte mit akka Strömen. Ich habe ein Diagramm ähnlich wie diese von here kopiert:Wie man runnable Diagramm stoppt

val topHeadSink = Sink.head[Int] 
val bottomHeadSink = Sink.head[Int] 
val sharedDoubler = Flow[Int].map(_ * 2)  
val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder => 
     (topHS, bottomHS) => 
     import GraphDSL.Implicits._ 
     val broadcast = builder.add(Broadcast[Int](2)) 
     Source.single(1) ~> broadcast.in  

    broadcast.out(0) ~> sharedDoubler ~> topHS.in 
    broadcast.out(1) ~> sharedDoubler ~> bottomHS.in 
    ClosedShape 
}) 

ich die Grafik laufen kann mit g.run() aber wie kann ich es stoppen? unter welchen Umständen sollte ich es tun (anders als die keine Verwendung - geschäftlich)? Dieses Diagramm ist in einem Akteur enthalten. wenn der Schauspieler abstürzt, was mit den Graphen passiert, die dem Schauspieler zugrunde liegen? Wird es auch enden?

Antwort

2

Wie in der documentation beschrieben, ist die Möglichkeit, ein Diagramm von außerhalb des Diagramms zu vervollständigen mit KillSwitch. Das Beispiel, das Sie aus der Dokumentation kopiert haben, ist kein guter Kandidat, um diesen Ansatz zu veranschaulichen, da die Quelle nur ein einzelnes Element ist und der Stream beim Ausführen sehr schnell abgeschlossen wird. Lassen Sie uns die Grafik anpassen mehr auf einfache Weise die KillSwitch in Aktion sehen:

val topSink = Sink.foreach(println) 
val bottomSink = Sink.foreach(println) 
val sharedDoubler = Flow[Int].map(_ * 2) 
val killSwitch = KillSwitches.single[Int] 

val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) { 
    implicit builder => (topS, bottomS, switch) => 

    import GraphDSL.Implicits._ 

    val broadcast = builder.add(Broadcast[Int](2)) 
    Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in 

    broadcast.out(0) ~> sharedDoubler ~> topS.in 
    broadcast.out(1) ~> sharedDoubler ~> bottomS.in 
    ClosedShape 
}) 

val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch) 
Thread.sleep(1000) 
res._3.shutdown() 

jetzt Die Quelle von einer Million Elementen besteht, und die Senken nun die gesendeten Elemente drucken. Der Stream läuft für eine Sekunde, was nicht genug Zeit ist, um alle eine Million Elemente zu durchlaufen, bevor wir shutdown aufrufen, um den Stream zu vervollständigen.

Wenn Sie einen Stream innerhalb eines Aktors ausführen, hängt der Lebenszyklus des zugrunde liegenden Aktors (oder der Akteure), der zum Ausführen des Streams erstellt wird, vom Lebenszyklus des einschließenden Aktors ab, abhängig davon, wie der Materializer erstellt wird. Lesen Sie die documentation für weitere Informationen. Der folgende Blogpost von Colin Breck über die Verwendung eines Schauspielers und KillSwitch zur Verwaltung des Lebenszyklus eines Streams ist ebenfalls hilfreich: http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/