2017-06-10 4 views
1

Was ich versuche zu lösen ist der folgende Fall: Angesichts einer unendlichen Lauf Akka Stream möchte ich in der Lage sein, bestimmte Punkte des Streams zu überwachen. Am besten könnte ich mir vorstellen, wohin man die Nachrichten an dieser Stelle an einen Actor schickt, der auch ein Source ist. Das macht es sehr flexibel für mich, entweder einzelne Quellen zu verbinden oder mehrere Quellen mit einem Websocket oder einem anderen Client, den ich verbinden möchte, zu verbinden. In diesem speziellen Fall versuche ich ScalaFX mit Akka Source zu verbinden, aber es funktioniert nicht wie erwartet. Wenn ich den Code unten starte, beginnen beide Zähler in Ordnung, aber nach einer Weile hört einer von ihnen auf und erholt sich nie wieder. Ich weiß, dass es bei der Verwendung von ScalaFX spezielle Überlegungen beim Threading gibt, aber ich habe nicht das Wissen, um zu verstehen, was hier vor sich geht oder es zu debuggen. Unten ist ein minimales Beispiel zum Ausführen, das Problem sollte nach etwa 5 Sekunden sichtbar sein.Überwachung von Akka Streams Quellen mit ScalaFX

Meine Frage ist:

Wie kann ich diesen Code ändern, wie erwartet zu funktionieren?

import akka.NotUsed 

import scalafx.Includes._ 
import akka.actor.{ActorRef, ActorSystem} 
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode} 
import akka.stream.scaladsl.{Flow, Sink, Source} 

import scalafx.application.JFXApp 
import scalafx.beans.property.{IntegerProperty, StringProperty} 
import scalafx.scene.Scene 
import scalafx.scene.layout.BorderPane 
import scalafx.scene.text.Text 
import scala.concurrent.duration._ 

/** 
    * Created by henke on 2017-06-10. 
    */ 
object MonitorApp extends JFXApp { 

    implicit val system = ActorSystem("monitor") 
    implicit val mat = ActorMaterializer() 

    val value1 = StringProperty("0") 
    val value2 = StringProperty("0") 

    stage = new JFXApp.PrimaryStage { 
    title = "Akka Stream Monitor" 
    scene = new Scene(600, 400) { 
     root = new BorderPane() { 
     left = new Text { 
      text <== value1 
     } 
     right = new Text { 
      text <== value2 
     } 
     } 
    } 
    } 

    override def stopApp() = system.terminate() 

    val monitor1 = createMonitor[Int] 
    val monitor2 = createMonitor[Int] 

    val marketChangeActor1 = monitor1 
    .to(Sink.foreach{ v => 
     value1() = v.toString 
    }).run() 

    val marketChangeActor2 = monitor2 
    .to(Sink.foreach{ v => 
     value2() = v.toString 
    }).run() 

    val monitorActor = Source[Int](1 to 100) 
    .throttle(1, 1.second, 1, ThrottleMode.shaping) 
    .via(logToMonitorAndContinue(marketChangeActor1)) 
    .map(_ * 10) 
    .via(logToMonitorAndContinue(marketChangeActor2)) 
    .to(Sink.ignore).run() 

    def createMonitor[T]: Source[T, ActorRef] = Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail) 

    def logToMonitorAndContinue[T](monitor: ActorRef): Flow[T, T, NotUsed] = { 
    Flow[T].map{ e => 
     monitor ! e 
     e 
    } 
    } 
} 
+1

Ich bin mir nicht sicher, ob dies eine tatsächliche Problemquelle ist, aber es scheint, dass Sie den Eigenschaften (und damit der Benutzeroberfläche) in den Actor-Systemthreads Werte zuweisen. Die Interaktion mit der Benutzeroberfläche sollte jedoch im JavaFX-GUI-Thread erfolgen. Versuchen Sie, 'value1() = v.toString' und den zweiten Aufruf in' Platform.runLater' zu umbrechen. –

+0

Ja danke das hat das Problem behoben. Ich wusste nichts über runLater. – user3139545

+0

Bitte @VladimirMatveev, wenn Sie die richtige Antwort haben, machen Sie eine Antwort und markieren Sie dies als gelöst. –

Antwort

1

Es scheint, dass Sie die Werte für die Eigenschaften zuweisen (und beeinflussen daher die UI) in der Aktorik Threads. Die Interaktion mit der Benutzeroberfläche sollte jedoch im JavaFX-GUI-Thread erfolgen. Versuchen Sie, value1() = v.toString und die zweite in Platform.runLater Anrufe zu wickeln.

Ich konnte keine definitive Aussage über die Verwendung von runLater zur Interaktion mit JavaFX-Daten finden, außer im JavaFX-Swing-Integrationsdokument, aber das ist eine ziemlich häufige Sache in UI-Bibliotheken; Gleiches gilt beispielsweise für Swing mit der SwingUtilities.invokeLater Methode.

Verwandte Themen