Ein Schauspieler initialisiert einen Akka-Stream, der sich mit einem Websocket verbindet. Dies geschieht durch Verwendung einer Source.actorRef
, an die Nachrichten gesendet werden können, die dann von der webSocketClientFlow
verarbeitet und von einer Sink.foreach
konsumiert werden. Dies kann in den folgenden Code zu sehen ist (abgeleitet von akka docs):Warum spielt Play Framework den Akka Stream nicht?
class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
final implicit val system: ActorSystem = ActorSystem()
final implicit val materializer: ActorMaterializer = ActorMaterializer()
def receive = {
case _ =>
}
// Consume the incoming messages from the websocket.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case misc => println(misc)
}
// Source through which we can send messages to the websocket.
val outgoing: Source[TextMessage, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
// Materialized the stream
val ((ws,upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// Check whether the server has accepted the websocket request.
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Failed: ${upgrade.response.status}")
}
}
// When the connection has been established.
connected.onComplete(println)
// When the stream has closed
closed.onComplete {
case Success(_) => println("Test Websocket closed gracefully")
case Failure(e) => log.error("Test Websocket closed with an error\n", e)
}
}
Wenn das Spiel Rahmen neu kompiliert es schließt die TestActor aber schließt nicht den Akka-Stream. Nur wenn der Websocket Timeouts ist, ist der Stream geschlossen.
Bedeutet dies, dass ich brauche den Strom manuell durch beispielsweise zu schließen, die mit Source.actorRef
ein PoisonPill
im TestActorPostStop
Funktion erstellt Schauspieler zu senden?
Hinweis: Ich habe auch versucht die Materializer
und die zu injizieren Actorsystem
dh:
@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)
Wenn Wiedergabe neu kompiliert wird der Strom geschlossen, sondern erzeugt auch einen Fehler:
[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]]
terminated abruptly