Nach der documentation to implement a KillSwitch
, konnte ich dieses einfache Beispiel für das Stoppen einer Quelle, die unendliche Zahlen ausgibt, codieren.Akka Streams: KillSwitch für benutzerdefinierte SourceShape-Frames aus der Videodatei
Mein Anwendungsfall unterscheidet sich in dem Sinne, dass die Quelle Frames aus einer Videodatei mit OpenCV sendet. Warum wird der Upstream nicht abgebrochen? Was fehlt mir hier?
object KillSwitchMinimalMain extends App {
val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java"))
System.load(libopencv_java(0))
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val videoFile = Video("Video.MOV")
val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile)
val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph)
val killSwitch = KillSwitches.shared("switch")
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) })
videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println)
ClosedShape
}).run()
Thread.sleep(200)
killSwitch.shutdown()
}
class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] {
val out: Outlet[Frame] = Outlet("VideoSource")
override val shape: SourceShape[Frame] = SourceShape(out)
val log: Logger = LoggerFactory.getLogger(getClass)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private val capture = new VideoCapture()
private val frame = new Mat()
private var videoPos: Double = _
override def preStart(): Unit = {
capture.open(videoFile.filepath)
readFrame()
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, Frame(videoPos, frame))
readFrame()
}
})
private def readFrame(): Unit = {
if (capture.isOpened) {
videoPos = capture.get(1)
log.info(s"reading frame $videoPos")
capture.read(frame)
}
}
}
}
Die Konsolenausgabe wie @svezfaz gestellt:
13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0
[email protected]
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0
[email protected]
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0
[email protected]
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0
[email protected]
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0
[email protected]
// and so on ..
Könnten Sie detailliert angeben, welche Ausgabe Sie sehen, wenn Sie das OpenCV-Beispiel ausführen? Weißt du, wie lange dauert ein Frame Pull? –
Ich habe die Frage mit einem Logger aktualisiert, der zeigt, dass Frames ziemlich schnell gelesen werden (immer noch viel langsamer als das Aussenden von ganzen Zahlen). 'javafx.scene.image.WritableImage @ xxxxxxxx' ist die' println' des Sinks. – Toaditoad