2017-04-12 3 views
5

Ich versuche, einen einfachen Proxy für Websocket-Verbindungen mit Play-und Akka-Streams zu erstellen. Der Verkehrsfluss ist wie folgt:Websocket-Proxy mit Play 2.6 und Akka Streams

(Client) request ->   -> request (Server) 
         Proxy 
(Client) response <-   <- response (Server) 

ich mit dem folgenden Code kam nach ein paar Beispiele folgende:

def socket = WebSocket.accept[String, String] { request => 

val uuid = UUID.randomUUID().toString 

// wsOut - actor that deals with incoming websocket frame from the Client 
// wsIn - publisher of the frame for the Server 
val (wsOut: ActorRef, wsIn: Publisher[String]) = { 
    val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail) 
    val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false) 
    source.toMat(sink)(Keep.both).run() 
} 

// sink that deals with the incoming messages from the Server 
val serverIncoming: Sink[Message, Future[Done]] = 
    Sink.foreach[Message] { 
    case message: TextMessage.Strict => 
     println("The server has sent: " + message.text) 
    } 

// source for sending a message over the WebSocket 
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_)) 

// flow to use (note: not re-usable!) 
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000")) 

// the materialized value is a tuple with 
// upgradeResponse is a Future[WebSocketUpgradeResponse] that 
// completes or fails when the connection succeeds or fails 
// and closed is a Future[Done] with the stream completion from the incoming sink 
val (upgradeResponse, closed) = 
serverOutgoing 
    .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] 
    .toMat(serverIncoming)(Keep.both) // also keep the Future[Done] 
    .run() 

// just like a regular http request we can access response status which is available via upgrade.response.status 
// status code 101 (Switching Protocols) indicates that server support WebSockets 
val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
    Future.successful(Done) 
    } else { 
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
    } 
} 

// in a real application you would not side effect here 
connected.onComplete(println) 
closed.foreach(_ => println("closed")) 

val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid)) 
val finalFlow = { 
    val sink = Sink.actorRef(actor, akka.actor.Status.Success(())) 
    val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ??? 
    Flow.fromSinkAndSource(sink, source) 
} 

finalFlow 

Mit diesem Code, geht der Datenverkehr vom Client zum Proxy auf dem Server zurück zum Proxy und das war's. Es erreicht den Kunden nicht weiter. Wie kann ich das beheben? Ich glaube, ich muss irgendwie die serverIncoming sinken auf den source im finalFlow verbinden, aber ich kann nicht herausfinden, wie es zu tun ...

Oder bin ich mit diesem Ansatz völlig falsch? Ist es besser, eine Bidiflow oder eine Graph zu verwenden? Ich bin neu in Akka Streams und versuche immer noch, Dinge herauszufinden.

Antwort

3

Folgendes scheint zu funktionieren. Hinweis: Ich habe sowohl den Server-Socket als auch den Proxy-Socket auf demselben Controller implementiert, Sie können sie jedoch aufteilen oder denselben Controller auf separaten Instanzen bereitstellen. Die ws-URL zum "oberen" Dienst muss in beiden Fällen aktualisiert werden.

package controllers 

import javax.inject._ 

import akka.actor.{Actor, ActorRef, ActorSystem, Props} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse} 
import akka.stream.Materializer 
import akka.stream.scaladsl.Flow 
import play.api.libs.streams.ActorFlow 
import play.api.mvc._ 

import scala.concurrent.{ExecutionContext, Future} 
import scala.language.postfixOps 

@Singleton 
class SomeController @Inject()(implicit exec: ExecutionContext, 
           actorSystem: ActorSystem, 
           materializer: Materializer) extends Controller { 

    /*--- proxy ---*/ 
    def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = 
    Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket")) 

    def proxySocket: WebSocket = WebSocket.accept[String, String] { _ => 
    Flow[String].map(s => TextMessage(s)) 
     .via(websocketFlow) 
     .map(_.asTextMessage.getStrictText) 
    } 

    /*--- server ---*/ 
    class UpperService(socket: ActorRef) extends Actor { 
    override def receive: Receive = { 
     case s: String => socket ! s.toUpperCase() 
     case _ => 
    } 
    } 

    object UpperService { 
    def props(socket: ActorRef): Props = Props(new UpperService(socket)) 
    } 

    def upperSocket: WebSocket = WebSocket.accept[String, String] { _ => 
    ActorFlow.actorRef(out => UpperService.props(out)) 
    } 
} 

Sie werden die Routen müssen wie folgt aufgebaut werden:

GET /upper-socket controllers.SomeController.upperSocket 
GET /proxy-socket controllers.SomeController.proxySocket 

Sie durch das Senden einer Zeichenfolge testen ws: // localhost: 9000/Proxy-Buchse. Die Antwort wird die obere Zeichenfolge sein.

Es wird jedoch ein Timeout nach 1 Minute Inaktivität sein:

akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute 

Aber sehen: http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html, wie dies zu konfigurieren.

+0

Dies ist kein Proxy. Dies ist ein sehr einfacher Server, der Strings übergibt und diese über Web-Sockets an den Client zurücksendet. –

+0

@morganfreeman Ich meinte den Controller selbst als Proxy. Der UpperService kann durch einen Akteur ersetzt werden, der einen externen Dienst aufruft, um eine echte Verarbeitung durchzuführen, beispielsweise: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-integrations.html#Integrating_with_External_Services . Aber ich verstehe dein Problem wahrscheinlich nicht richtig. – botkop

+0

Richtig, der UpperService könnte theoretisch die Frames über einen Web-Socket an den Server senden. Ich habe es anfangs versucht. Dieser Web-Socket für den Server ist ein Flow. Ich konnte die Empfangsmethode des Schauspielers nicht mit der Quelle des Flusses und der Senke verbinden, die zurück zum "Socket" fließt (so dass die Daten zum Client zurückkommen). Ich konnte den Web-Socket-Fluss in der Empfangs-Methode initialisieren, aber das würde jedes Mal eine Verbindung zum Server öffnen. –

2

Ein Proxy-Bedarf zwei Ströme (Proxy Fluss A/B) zur Verfügung zu stellen:

(Client) request -> Proxy Flow A -> request (Server) 

(Client) response <- Proxy Flow B <- response (Server) 

Eine Möglichkeit solche Proxy-Flow zu implementieren verwendet ActorSubscriber und SourceQueue:

class Subscriber[T](proxy: ActorRef) extends ActorSubscriber { 
    private var queue = Option.empty[SourceQueueWithComplete[T]] 
    def receive = { 
    case Attach(sourceQueue) => queue = Some(sourceQueue) 
    case msg: T => // wait until queue attached and pass forward all msgs to queue and the proxy actor 
    } 
} 

def proxyFlow[T](proxy: ActorRef): Flow[T, ActorRef] = { 
    val sink = Sink.actorSubscriber(Props(new Subscriber[T](proxy))) 
    val source = Source.queue[T](...) 
    Flow.fromSinkAndSourceMat(sink, source){ (ref, queue) => 
    ref ! Attach(queue) 
    ref 
    } 
} 

Sie können dann zusammenbauen der Client fließen wie:

val proxy = actorOf(...) 
val requestFlow = proxyFlow[Request](proxy) 
val responseFlow = proxyFlow[Response](proxy) 
val finalFlow: Flow[Request, Response] = 
    requestFlow.via(webSocketFlow).via(responseFlow) 
2

Zunächst einmal müssen Sie einige akka Importe:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.WebSocketRequest 
import akka.http.scaladsl.model.ws.Message 
import akka.http.scaladsl.model.HttpRequest 
import akka.http.scaladsl.model.HttpResponse 
import akka.stream.scaladsl.Flow 
import akka.http.scaladsl.server.Directives.{ extractUpgradeToWebSocket, complete } 

Dies ist ein Beispiel App, die eine WebSocket Proxy, Bindung an 0.0.0.0 auf Port 80, proxing zu ws://echo.websocket.org schafft:

object WebSocketProxy extends App { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    private[this] def manipulateFlow: Flow[Message, Message, akka.NotUsed] = ??? 

    private[this] def webSocketFlow = 
    Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) 

    private[this] val route: Flow[HttpRequest, HttpResponse, Any] = 
    extractUpgradeToWebSocket { upgrade => 
     val webSocketFlowProxy = manipulateFlow via webSocketFlow 
     val handleWebSocketProxy = upgrade.handleMessages(webSocketFlowProxy) 
     complete(handleWebSocketProxy) 
    } 

    private[this] val proxyBindingFuture = 
    Http().bindAndHandle(route, "0.0.0.0", 80) 

    println(s"Server online\nPress RETURN to stop...") 
    Console.readLine() 
} 

Sie müssen anpassen Ihre für play und für Anwendungsstruktur.Hinweise

:

  • erinnern proxyBindingFuture zu entbinden und die system in Produktion zu beenden;
  • Sie brauchen manipulateFlow nur, wenn Sie Nachrichten manipulieren möchten.
Verwandte Themen