2017-11-27 4 views
0

Ich habe die folgende Grafik:Akka Streams außer Kraft setzen Killswitch

case class FlowFactory() { 

    val reactiveConnection = ??? 
    val serviceRabbitConnection = ??? 

    val switch = KillSwitches.single[Routed] 

    val stream: RunnableGraph[UniqueKillSwitch] = RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw => 
    import GraphDSL.Implicits._ 

    val in = builder.add(Source.fromPublisher(reactiveConnection.consume(???))) 
    val context = builder.add(contextFlow(serviceRabbitConnection)) 
    val inflate = builder.add(inflateFlow()) 
    val compute = builder.add(computeFlow()) 
    val out = builder.add(Sink.fromSubscriber(reactiveConnection.publish())) 

    in ~> context ~> inflate ~> compute ~> sw ~> out 

    ClosedShape 
    }) 

    val killSwitch = stream.run() 

    killSwitch.shutdown() 

} 

Wenn ich Abschaltung der Strom, ich brauche auch folgende Verbindungen zu töten: reactiveConnection und serviceRabbitConnection. Wie erreiche ich das, gibt es eine einfache Methode KillSwitch 's shutdown() Methode zu überschreiben? Gibt es eine Methode, die aufgerufen wird, wenn der Stream geschlossen ist, wie onComplete() oder onClose()?

Antwort

2

Sie können Ihren Rückruf innerhalb des Streams ausführen, indem Sie eine zusätzliche Spüle (Sink.onComplete) anschließen.

val sink1 = Sink.fromSubscriber(reactiveConnection.publish()) 
    val sink2 = Sink.onComplete{ 
    case Success(_) ⇒ println("success!") 
    case Failure(e) ⇒ println(s"failure - $e") 
    } 

    val out = builder.add(Sink.combine(sink1, sink2)(Broadcast(_))) 
Verwandte Themen