Ich versuche, einen einfachen Proxy für Websocket-Verbindungen mit Play-und Akka-Streams zu erstellen. Der Verkehrsfluss ist wie folgt:Websocket-Proxy mit Play 2.6 und Akka Streams
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
ich mit dem folgenden Code kam nach ein paar Beispiele folgende:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("The server has sent: " + message.text)
}
// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
Flow.fromSinkAndSource(sink, source)
}
finalFlow
Mit diesem Code, geht der Datenverkehr vom Client zum Proxy auf dem Server zurück zum Proxy und das war's. Es erreicht den Kunden nicht weiter. Wie kann ich das beheben? Ich glaube, ich muss irgendwie die serverIncoming
sinken auf den source
im finalFlow
verbinden, aber ich kann nicht herausfinden, wie es zu tun ...
Oder bin ich mit diesem Ansatz völlig falsch? Ist es besser, eine Bidiflow
oder eine Graph
zu verwenden? Ich bin neu in Akka Streams und versuche immer noch, Dinge herauszufinden.
Dies ist kein Proxy. Dies ist ein sehr einfacher Server, der Strings übergibt und diese über Web-Sockets an den Client zurücksendet. –
@morganfreeman Ich meinte den Controller selbst als Proxy. Der UpperService kann durch einen Akteur ersetzt werden, der einen externen Dienst aufruft, um eine echte Verarbeitung durchzuführen, beispielsweise: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-integrations.html#Integrating_with_External_Services . Aber ich verstehe dein Problem wahrscheinlich nicht richtig. – botkop
Richtig, der UpperService könnte theoretisch die Frames über einen Web-Socket an den Server senden. Ich habe es anfangs versucht. Dieser Web-Socket für den Server ist ein Flow. Ich konnte die Empfangsmethode des Schauspielers nicht mit der Quelle des Flusses und der Senke verbinden, die zurück zum "Socket" fließt (so dass die Daten zum Client zurückkommen). Ich konnte den Web-Socket-Fluss in der Empfangs-Methode initialisieren, aber das würde jedes Mal eine Verbindung zum Server öffnen. –