Was ich versuche zu lösen ist der folgende Fall: Angesichts einer unendlichen Lauf Akka Stream möchte ich in der Lage sein, bestimmte Punkte des Streams zu überwachen. Am besten könnte ich mir vorstellen, wohin man die Nachrichten an dieser Stelle an einen Actor schickt, der auch ein Source
ist. Das macht es sehr flexibel für mich, entweder einzelne Quellen zu verbinden oder mehrere Quellen mit einem Websocket oder einem anderen Client, den ich verbinden möchte, zu verbinden. In diesem speziellen Fall versuche ich ScalaFX mit Akka Source zu verbinden, aber es funktioniert nicht wie erwartet. Wenn ich den Code unten starte, beginnen beide Zähler in Ordnung, aber nach einer Weile hört einer von ihnen auf und erholt sich nie wieder. Ich weiß, dass es bei der Verwendung von ScalaFX spezielle Überlegungen beim Threading gibt, aber ich habe nicht das Wissen, um zu verstehen, was hier vor sich geht oder es zu debuggen. Unten ist ein minimales Beispiel zum Ausführen, das Problem sollte nach etwa 5 Sekunden sichtbar sein.Überwachung von Akka Streams Quellen mit ScalaFX
Meine Frage ist:
Wie kann ich diesen Code ändern, wie erwartet zu funktionieren?
import akka.NotUsed
import scalafx.Includes._
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Flow, Sink, Source}
import scalafx.application.JFXApp
import scalafx.beans.property.{IntegerProperty, StringProperty}
import scalafx.scene.Scene
import scalafx.scene.layout.BorderPane
import scalafx.scene.text.Text
import scala.concurrent.duration._
/**
* Created by henke on 2017-06-10.
*/
object MonitorApp extends JFXApp {
implicit val system = ActorSystem("monitor")
implicit val mat = ActorMaterializer()
val value1 = StringProperty("0")
val value2 = StringProperty("0")
stage = new JFXApp.PrimaryStage {
title = "Akka Stream Monitor"
scene = new Scene(600, 400) {
root = new BorderPane() {
left = new Text {
text <== value1
}
right = new Text {
text <== value2
}
}
}
}
override def stopApp() = system.terminate()
val monitor1 = createMonitor[Int]
val monitor2 = createMonitor[Int]
val marketChangeActor1 = monitor1
.to(Sink.foreach{ v =>
value1() = v.toString
}).run()
val marketChangeActor2 = monitor2
.to(Sink.foreach{ v =>
value2() = v.toString
}).run()
val monitorActor = Source[Int](1 to 100)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.via(logToMonitorAndContinue(marketChangeActor1))
.map(_ * 10)
.via(logToMonitorAndContinue(marketChangeActor2))
.to(Sink.ignore).run()
def createMonitor[T]: Source[T, ActorRef] = Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail)
def logToMonitorAndContinue[T](monitor: ActorRef): Flow[T, T, NotUsed] = {
Flow[T].map{ e =>
monitor ! e
e
}
}
}
Ich bin mir nicht sicher, ob dies eine tatsächliche Problemquelle ist, aber es scheint, dass Sie den Eigenschaften (und damit der Benutzeroberfläche) in den Actor-Systemthreads Werte zuweisen. Die Interaktion mit der Benutzeroberfläche sollte jedoch im JavaFX-GUI-Thread erfolgen. Versuchen Sie, 'value1() = v.toString' und den zweiten Aufruf in' Platform.runLater' zu umbrechen. –
Ja danke das hat das Problem behoben. Ich wusste nichts über runLater. – user3139545
Bitte @VladimirMatveev, wenn Sie die richtige Antwort haben, machen Sie eine Antwort und markieren Sie dies als gelöst. –