2016-12-30 1 views
0

Ich versuche, einen Strom zu beenden, der einige Zahlen transformiert und einen anderen Strom des gleichen Diagramms erstellt. Die zweite Stream-Instanz wird jedoch nicht ausgeführt oder druckt nichts auf der Konsole.Akka Streams: Erstellen eines anderen RunnableGraph nach dem Herunterfahren mit KillSwitch

Was fehlt mir?

object KillSwitchSample extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val killSwitch = KillSwitches.shared("switch") 

    val stream1 = createStream("stream 1") 
    stream1.run() 
    Thread.sleep(200) 
    killSwitch.shutdown() 

    val stream2 = createStream("stream 2") 
    stream2.run() 
    Thread.sleep(200) 
    killSwitch.shutdown() 

    def createStream(streamName: String): RunnableGraph[NotUsed] = { 
    Source.fromGraph(new NumbersSource) 
     .via(killSwitch.flow) 
     .map(el => s"$streamName: $el") 
     .to(Sink.foreach(println)) 
    } 
} 

class NumbersSource extends GraphStage[SourceShape[Int]] { 
    val out: Outlet[Int] = Outlet("NumbersSource") 
    override val shape: SourceShape[Int] = SourceShape(out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private var counter = 1 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, counter) 
      counter += 1 
     } 
     }) 
    } 
} 

Antwort

4

Sie verwenden eine gemeinsame KillSwitch. Ein freigegebener KillSwitch kann nur einmal umgeschaltet werden, nachdem er sich daran erinnert, dass er bereits umgeschaltet wurde und daher sofort auch spätere Streams beenden wird.

Das passiert mit Ihrem Code. Sie haben den Kill-Schalter ausgelöst, bevor Sie das Diagramm zum zweiten Mal ausführen.

können Sie einen KillSwitches.single verwenden, anstatt eine neue KillSwitch jedes Mal zu erhalten:

def createStream(streamName: String): RunnableGraph[UniqueKillSwitch] = 
    Source.fromGraph(new NumbersSource) 
    .map(el => s"$streamName: $el") 
    .viaMat(KillSwitches.single)(Keep.right) 
    .to(Sink.foreach(println)) 

val switch1 = createStream("a").run() 
// ... 
switch1.shutdown() 

val switch2 = createStream("b").run() 
// ... 
switch2.shutdown() 
+0

Danke, ich war nicht wirklich bewusst, dass und nicht genug Aufmerksamkeit auf die [Dokumentation] (http zahlen: //doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Controlling_graph_completion_with_KillSwitch), dass beide Arten von 'KillSwitch' (nicht nur' SharedKillSwitch') nachfolgende Aufrufe von 'shutdown()' ignorieren oder 'abort()'. – Toaditoad

Verwandte Themen