2016-08-31 1 views
3

Ich versuche, einen TCP-Stream zu einem anderen Sink mit Akka 2.4.3 umleiten/weiterleiten. Das Programm sollte einen Server-Socket öffnen, auf eingehende Verbindungen warten und dann den TCP-Stream konsumieren. Unser Absender erwartet keine Antworten von uns, daher senden wir nie etwas zurück - wir konsumieren nur den Stream. Nach dem Framing des TCP-Streams müssen wir die Bytes in etwas Nützlicheres transformieren und an den Sink senden.Verbrauchen TCP-Stream und umleiten es zu einem anderen Sink (mit Akka-Streams)

Ich habe bisher folgendes versucht, aber ich kämpfe vor allem mit dem Teil, wie man TCP-Pakete nicht zurück zum Absender sendet und den Sink richtig verbindet.

import scala.util.Failure 
import scala.util.Success 

import akka.actor.ActorSystem 
import akka.event.Logging 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Tcp 
import akka.stream.scaladsl.Framing 
import akka.util.ByteString 
import java.nio.ByteOrder 
import akka.stream.scaladsl.Flow 

object TcpConsumeOnlyStreamToSink { 
    implicit val system = ActorSystem("stream-system") 
    private val log = Logging(system, getClass.getName)  

    //The Sink 
    //In reality this is of course a real Sink doing some useful things :-) 
    //The Sink accept types of "SomethingMySinkUnderstand" 
    val mySink = Sink.ignore; 

    def main(args: Array[String]): Unit = { 
    //our sender is not interested in getting replies from us 
    //so we just want to consume the tcp stream and never send back anything to the sender 
    val (address, port) = ("127.0.0.1", 6000) 
    server(system, address, port) 
    } 

    def server(system: ActorSystem, address: String, port: Int): Unit = { 
    implicit val sys = system 
    import system.dispatcher 
    implicit val materializer = ActorMaterializer() 
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn => 
     println("Client connected from: " + conn.remoteAddress) 

     conn handleWith Flow[ByteString] 
     //this is neccessary since we use a self developed tcp wire protocol 
     .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
     //here we want to map the raw bytes into something our Sink understands 
     .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) 
     //here we like to connect our Sink to the Tcp Source 
     .to(mySink) //<------ NOT COMPILING 
    } 


    val tcpSource = Tcp().bind(address, port) 
    val binding = tcpSource.to(handler).run() 

    binding.onComplete { 
     case Success(b) => 
     println("Server started, listening on: " + b.localAddress) 
     case Failure(e) => 
     println(s"Server could not bind to $address:$port: ${e.getMessage}") 
     system.terminate() 
    } 

    } 

    class SomethingMySinkUnderstand(x:String) { 

    } 
} 

Update: Fügen Sie diese auf Ihre build.sbt Datei notwendig deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
+0

könnte hilfreich sein, Ihre build.sbt Datei zu zeigen, oder zumindest die Abhängigkeiten so andere können diese neu erstellen. – Brian

+0

hinzugefügt die Deps oben in der Frage – salyh

Antwort

4

handleWith erwartet ein Flow, das heißt, eine Box mit einem nicht verbundenen Einlass und einem nicht verbundenen Ausgang zu erhalten. Sie stellen effektiv eine Source bereit, weil Sie die Flow mit einer Sink unter Verwendung der to-Operation verbunden haben.

Ich glaube, Sie folgendes tun können:

conn.handleWith(
    Flow[ByteString] 
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) 
    .alsoTo(mySink) 
    .map(_ => ByteString.empty) 
    .filter(_ => false) // Prevents sending anything back 
) 
+0

Dies funktioniert nicht für mich. Hat es für OP funktioniert? – tapasvi

+0

ja, arbeitete für mich – salyh

Verwandte Themen