Ich versuche, einen TCP-Stream zu einem anderen Sink mit Akka 2.4.3 umleiten/weiterleiten. Das Programm sollte einen Server-Socket öffnen, auf eingehende Verbindungen warten und dann den TCP-Stream konsumieren. Unser Absender erwartet keine Antworten von uns, daher senden wir nie etwas zurück - wir konsumieren nur den Stream. Nach dem Framing des TCP-Streams müssen wir die Bytes in etwas Nützlicheres transformieren und an den Sink senden.Verbrauchen TCP-Stream und umleiten es zu einem anderen Sink (mit Akka-Streams)
Ich habe bisher folgendes versucht, aber ich kämpfe vor allem mit dem Teil, wie man TCP-Pakete nicht zurück zum Absender sendet und den Sink richtig verbindet.
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow
object TcpConsumeOnlyStreamToSink {
implicit val system = ActorSystem("stream-system")
private val log = Logging(system, getClass.getName)
//The Sink
//In reality this is of course a real Sink doing some useful things :-)
//The Sink accept types of "SomethingMySinkUnderstand"
val mySink = Sink.ignore;
def main(args: Array[String]): Unit = {
//our sender is not interested in getting replies from us
//so we just want to consume the tcp stream and never send back anything to the sender
val (address, port) = ("127.0.0.1", 6000)
server(system, address, port)
}
def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)
conn handleWith Flow[ByteString]
//this is neccessary since we use a self developed tcp wire protocol
.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
//here we want to map the raw bytes into something our Sink understands
.map(msg => new SomethingMySinkUnderstand(msg.utf8String))
//here we like to connect our Sink to the Tcp Source
.to(mySink) //<------ NOT COMPILING
}
val tcpSource = Tcp().bind(address, port)
val binding = tcpSource.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
class SomethingMySinkUnderstand(x:String) {
}
}
Update: Fügen Sie diese auf Ihre build.sbt Datei notwendig deps
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
könnte hilfreich sein, Ihre build.sbt Datei zu zeigen, oder zumindest die Abhängigkeiten so andere können diese neu erstellen. – Brian
hinzugefügt die Deps oben in der Frage – salyh