2016-12-16 2 views
0

Nach der documentation to implement a KillSwitch, konnte ich dieses einfache Beispiel für das Stoppen einer Quelle, die unendliche Zahlen ausgibt, codieren.Akka Streams: KillSwitch für benutzerdefinierte SourceShape-Frames aus der Videodatei

Mein Anwendungsfall unterscheidet sich in dem Sinne, dass die Quelle Frames aus einer Videodatei mit OpenCV sendet. Warum wird der Upstream nicht abgebrochen? Was fehlt mir hier?

object KillSwitchMinimalMain extends App { 
    val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java")) 
    System.load(libopencv_java(0)) 

    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val videoFile = Video("Video.MOV") 

    val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile) 
    val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph) 

    val killSwitch = KillSwitches.shared("switch") 

    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) }) 

    videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println) 

    ClosedShape 
    }).run() 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] { 
    val out: Outlet[Frame] = Outlet("VideoSource") 
    override val shape: SourceShape[Frame] = SourceShape(out) 
    val log: Logger = LoggerFactory.getLogger(getClass) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private val capture = new VideoCapture() 
     private val frame = new Mat() 
     private var videoPos: Double = _ 

     override def preStart(): Unit = { 
     capture.open(videoFile.filepath) 
     readFrame() 
     } 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, Frame(videoPos, frame)) 
      readFrame() 
     } 
     }) 

     private def readFrame(): Unit = { 
     if (capture.isOpened) { 
      videoPos = capture.get(1) 
      log.info(s"reading frame $videoPos") 
      capture.read(frame) 
     } 
     } 
    } 
} 

Die Konsolenausgabe wie @svezfaz gestellt:

13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0 
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0 
[email protected] 
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0 
[email protected] 
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0 
[email protected] 
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0 
[email protected] 
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0 
[email protected] 
// and so on .. 
+0

Könnten Sie detailliert angeben, welche Ausgabe Sie sehen, wenn Sie das OpenCV-Beispiel ausführen? Weißt du, wie lange dauert ein Frame Pull? –

+0

Ich habe die Frage mit einem Logger aktualisiert, der zeigt, dass Frames ziemlich schnell gelesen werden (immer noch viel langsamer als das Aussenden von ganzen Zahlen). 'javafx.scene.image.WritableImage @ xxxxxxxx' ist die' println' des Sinks. – Toaditoad

Antwort

1

Das Problem ist, dass Sie in Ihrer benutzerdefinierten Bühne introduce blockieren. Ich kenne die OpenCV-API nicht, aber ich vermute, es passiert, wenn Sie capture.read(frame) aufrufen. Nun, wenn nicht anders angegeben, wird Ihr Diagramm in einem einzigen Actor ausgeführt, daher blockiert Blockieren in Ihrem Stadium den gesamten Akteur.

Erzwingen eine async Grenze nach Ihrer Quelle sollte den Trick tun.

Beachten Sie auch, dass Sie hier kein GraphDSL benötigen, alles kann über die Via/DSL kompakt ausgeführt werden.

Lösung Versuch unter

object KillSwitchMinimalMain extends App { 
    val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java")) 

    System.load(libopencv_java(0)) 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val videoFile = Video("Video.MOV") 

    val killSwitch = KillSwitches.shared("switch") 
    val matConversion = Flow[ByteString].map { _.utf8String } 

    Source.fromGraph(new VideoSource()) 
    .async 
    .via(killSwitch.flow) 
    .via(matConversion) 
    .runForeach(println) 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

Für weitere Informationen über die Gleichzeitigkeit Modell Akka Streams zugrunde liegen Sie dieses blogpost lesen kann.

+0

Vielen Dank für diese Erklärung. Ich erinnere mich, darüber in der [Dokumentation] gelesen zu haben (http://doc.akka.io/docs/akka/2.4/scala/stream/stream-flows-and-basics.html#Operator_Fusion), aber offensichtlich konnte das nicht wirklich zutreffen es. In Bezug auf die GraphDSL, ist mir bewusst, dass es hier nicht notwendig ist, aber der veröffentlichte Code ist eine gekochte Version, die meine Broadcast und Merge-Phasen auslässt. Aber seit Sie es erwähnt haben: Gibt es noch andere Nachteile, die GraphDSL verwenden, abgesehen davon, dass es ausführlicher ist? – Toaditoad

+0

Ein weiterer erwähnenswerter Nachteil ist, dass der GraphDSL etwas zerbrechlicher und anfällig für Laufzeitfehler ist, wo immer die Ein- und Ausgänge nicht richtig verbunden sind. Keine großen Unterschiede in der Leistung, die mir bewusst sind. –

+0

Danke. Das macht Sinn. – Toaditoad

Verwandte Themen