2017-09-30 1 views
1

Ein Schauspieler initialisiert einen Akka-Stream, der sich mit einem Websocket verbindet. Dies geschieht durch Verwendung einer Source.actorRef, an die Nachrichten gesendet werden können, die dann von der webSocketClientFlow verarbeitet und von einer Sink.foreach konsumiert werden. Dies kann in den folgenden Code zu sehen ist (abgeleitet von akka docs):Warum spielt Play Framework den Akka Stream nicht?

class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging { 

    final implicit val system: ActorSystem = ActorSystem() 
    final implicit val materializer: ActorMaterializer = ActorMaterializer() 

    def receive = { 
    case _ => 
    } 

    // Consume the incoming messages from the websocket. 
    val incoming: Sink[Message, Future[Done]] = 
    Sink.foreach[Message] { 
    case message: TextMessage.Strict => 
     println(message.text) 
    case misc => println(misc) 
    } 

    // Source through which we can send messages to the websocket. 
    val outgoing: Source[TextMessage, ActorRef] = 
    Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail) 

    // flow to use (note: not re-usable!) 
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com")) 

    // Materialized the stream 
    val ((ws,upgradeResponse), closed) = 
    outgoing 
    .viaMat(webSocketFlow)(Keep.both) 
    .toMat(incoming)(Keep.both) // also keep the Future[Done] 
    .run() 

    // Check whether the server has accepted the websocket request. 
    val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
    } else { 
     throw new RuntimeException(s"Failed: ${upgrade.response.status}") 
    } 
    } 

    // When the connection has been established. 
    connected.onComplete(println) 

    // When the stream has closed 
    closed.onComplete { 
    case Success(_) => println("Test Websocket closed gracefully") 
    case Failure(e) => log.error("Test Websocket closed with an error\n", e) 
    } 

} 

Wenn das Spiel Rahmen neu kompiliert es schließt die TestActor aber schließt nicht den Akka-Stream. Nur wenn der Websocket Timeouts ist, ist der Stream geschlossen.

Bedeutet dies, dass ich brauche den Strom manuell durch beispielsweise zu schließen, die mit Source.actorRef ein PoisonPill im TestActorPostStop Funktion erstellt Schauspieler zu senden?

Hinweis: Ich habe auch versucht die Materializer und die zu injizieren Actorsystem dh:

@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem) 

Wenn Wiedergabe neu kompiliert wird der Strom geschlossen, sondern erzeugt auch einen Fehler:

[error] a.a.ActorSystemImpl - Websocket handler failed with 
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]] 
terminated abruptly 

Antwort

1

In In Ihrem ersten Beispiel erstellen Sie ein Akteursystem in Ihrem Akteur. Das sollten Sie nicht tun - Akteurs-Systeme sind teuer, eine zu erstellen heißt, Thread-Pools zu starten, Scheduler zu starten usw. Außerdem schalten Sie es nie herunter, was bedeutet, dass Sie ein viel größeres Problem haben, als dass der Stream nicht herunterfährt - Sie haben ein Ressourcenleck, die vom Actor-System erstellten Thread-Pools werden nie heruntergefahren.

Im Wesentlichen erstellen Sie also jedes Mal, wenn Sie eine WebSocket-Verbindung erhalten, ein neues Actor-System mit einer neuen Gruppe von Thread-Pools, und Sie schalten sie nie herunter. In der Produktion, mit einer kleinen Last (einige Anfragen pro Sekunde), wird Ihrer Anwendung innerhalb weniger Minuten der Speicher ausgehen.

Im Allgemeinen sollten Sie in Play nie Ihr eigenes Actor-System erstellen, sondern nur eines injizieren. Innerhalb eines Schauspielers müssen Sie es nicht einmal injizieren, weil es automatisch - context.system gibt Ihnen Zugriff auf das Akteurs-System, das den Schauspieler erstellt. Genauso wie bei Materialisierern sind diese nicht so schwer wiegen, aber wenn Sie eine pro Verbindung erstellen, können Sie auch nicht genügend Arbeitsspeicher haben, wenn Sie es nicht herunterfahren, also sollten Sie es injizieren lassen.

Also, wenn Sie es injiziert haben, erhalten Sie einen Fehler - das ist schwer zu vermeiden, wenn auch nicht unmöglich. Die Schwierigkeit besteht darin, dass Akka selbst nicht wirklich automatisch wissen kann, in welcher Reihenfolge Dinge abgeschaltet werden müssen, um die Dinge anmutig zu schließen, sollte es den Schauspieler zuerst herunterfahren, damit es die Ströme anmutig schließen kann oder die Ströme schließen sollte nach unten, damit sie Ihrem Akteur mitteilen können, dass sie heruntergefahren sind und entsprechend reagieren?

Akka 2.5 hat eine Lösung für dieses, eine verwaltete Abschaltsequenz, wo man Dinge registrieren kann abgeschaltet werden, bevor der Schauspieler System startet Dinge in etwas zufälliger Reihenfolge zu töten:

https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown

Sie verwenden Dies in Kombination mit Akka-Streams kill switches, um Ihre Streams ordnungsgemäß herunterzufahren, bevor der Rest der Anwendung heruntergefahren wird.

Aber im Allgemeinen sind die Abschaltfehler ziemlich gutartig, also wenn ich ich wäre, würde ich mich nicht um sie kümmern.