2017-02-08 4 views
3

Ich versuche Nachricht an Schauspieler bindet mit Source.actorRef zu schicken, aber dieser Teil des Codes:mapMaterializedValue nichts zu tun mit Source.actorRef

println(s"Before mapping $src") 
src.mapMaterializedValue { ref => 
    println(s"Mapping $ref") 
    ref ! letter.text 
} 
println(s"After mapping $src") 

druckt nur etwa so:

Vor Mapping Quelle (SourceShape (ActorRefSource.out), ActorRefSource (0, Fail) [5564f412])
Nach Mapping Quelle (SourceShape (ActorRefSource.out), ActorRefSource (0, Fail) [5564f412])

So. Irgendwie mapMaterializedValue nichts tun. Sicherlich keine Nachricht an den Schauspieler gesendet. Ist ref - Keine aus irgendeinem Grund?

Weiter poste ich den ganzen Code. Es ist eine Handlung von etwas wie einfacher Messenger (Eins-zu-Eins-Nachrichten) auf Websockets. Ich studiere gerade Akka Streams, also ist dieser Code wirklich nicht perfekt. Ich bin bereit, irgendwelche Kritiker oder Ratschläge zu hören.

Hauptserverobjekt:

package treplol.server 

import treplol.common._ 

import akka.actor.{ActorRef, ActorSystem} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws._ 
import akka.http.scaladsl.server.Directives._ 
import akka.stream.scaladsl._ 
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy} 

import scala.io.StdIn 
import java.util.UUID 

object WsServer extends App { 

    implicit val system = ActorSystem("example") 
    implicit val materializer = ActorMaterializer() 

    def createSource(uuid: UUID): Source[String, ActorRef] = { 
    val src = Source.actorRef[String](0, OverflowStrategy.fail) 
    sources(uuid) = src 
    src 
    } 

    val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] = 
    collection.mutable.HashMap[UUID, Source[String, ActorRef]]() 
    val userSources: collection.mutable.HashMap[String, UUID] = 
    collection.mutable.HashMap[String, UUID]() 

    def flow: Flow[Message, Message, Any] = { 

    val uuid: UUID = UUID.randomUUID() 
    val incomingSource: Source[String, ActorRef] = createSource(uuid) 

    Flow.fromGraph(GraphDSL.create() { implicit b => 
     import GraphDSL.Implicits._ 

     val merge = b.add(Merge[String](2)) 

     val mapMsgToLttr = b.add(
     Flow[Message].collect { case TextMessage.Strict(txt) => txt } 
      .map[Letter] { txt => 
      WsSerializer.decode(txt) match { 
       case Auth(from) => 
       userSources(from) = uuid 
       Letter("0", from, "Authorized!") 
       case ltr: Letter => ltr 
      } 
      } 
    ) 

     val processLttr = b.add(
     Flow[Letter].map[String] { letter => 
      userSources.get(letter.to) flatMap sources.get match { 
      case Some(src) => 
       println(s"Before mapping $src") 
       src.mapMaterializedValue { ref => 
       println(s"Mapping $ref") 
       ref ! letter.text 
       } 
       println(s"After mapping $src") 
       "" 
      case None => "Not authorized!" 
      } 
     } 
    ) 

     val mapStrToMsg = b.add(
     Flow[String].map[TextMessage] (str => TextMessage.Strict(str)) 
    ) 

     mapMsgToLttr ~> processLttr ~> merge 
        incomingSource ~> merge ~> mapStrToMsg 

     FlowShape(mapMsgToLttr.in, mapStrToMsg.out) 
    }) 

    } 

    val route = path("ws")(handleWebSocketMessages(flow)) 
    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 

    import system.dispatcher 
    bindingFuture 
    .flatMap(_.unbind()) 
    .onComplete(_ => system.terminate()) 
} 

Allgemeines Paket:

package treplol 

package object common { 

    trait WsMessage 
    case class Letter(from: String, to: String, text: String) extends WsMessage 
    case class Auth(from: String) extends WsMessage 

    object WsSerializer { 

    import org.json4s.{Extraction, _} 
    import org.json4s.jackson.JsonMethods.{compact, parse} 
    import org.json4s.jackson.Serialization 

    implicit val formats = { 
     Serialization.formats(NoTypeHints) 
    } 

    case class WsData(typeOf: String, data: String) 
    object WsDataType { 
     val LETTER = "letter" 
     val AUTH = "auth" 
    } 

    class WrongIncomingData extends Throwable 

    def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match { 
     case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter] 
     case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth] 
     case _ => throw new WrongIncomingData 
    } 

    def encode(wsMessage: WsMessage): String = { 
     val typeOf = wsMessage match { 
     case _: Letter => WsDataType.LETTER 
     case _: Auth => WsDataType.AUTH 
     case _ => throw new WrongIncomingData 
     } 
     compact(Extraction.decompose(
     WsData(typeOf, compact(Extraction.decompose(wsMessage))) 
    )) 
    } 
    } 

} 

build.sbt

name := "treplol" 

version := "0.0" 

scalaVersion := "2.12.1" 

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases" 

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.16", 
    "com.typesafe.akka" %% "akka-stream" % "2.4.16", 
    "com.typesafe.akka" %% "akka-http" % "10.0.3", 
    "org.json4s" %% "json4s-jackson" % "3.5.0" 
) 

Vielen Dank im Voraus!

+0

Sieht aus wie 'mapMaterializedValue' eine neue Quelle zurückgibt. Ihr Beispiel ist wie "val x = 1; println (x); x + 3; println (x) '. Versuchen Sie 'val src2 = src.mapMaterializedValue (...); println (src2) ' – Dylan

Antwort

4

sich nach den Dokumenten, die mapMaterializedValue combinator

Transformation nur die materialisierte Wert dieser Quelle, alle andere Eigenschaften zu verlassen, wie sie waren.

Die materialisierte Wert nach jeder Graph Stufe nur verfügbar ist (in diesem Fall ist die Quelle) ist Lauf. Sie führen Ihre Quelle niemals in Ihrem Code aus.

Beachten Sie, dass das Flow[Message, Message, Any], das Sie verwenden, um WebSocket-Nachrichten zu verarbeiten, tatsächlich von der Akka-HTTP-Infrastruktur ausgeführt wird, so dass Sie es nicht manuell tun müssen. Die Source, die Sie im Körper von processLttr erstellen, sind jedoch nicht an den Rest des Diagramms angehängt und werden daher nicht ausgeführt.

Weitere Informationen zum Ausführen von Diagrammen und Materialisierung finden Sie unter docs.

+0

Danke für die Antwort? Stefano! Aber schau ... Für jede Verbindung am Anfang der 'flow' Methode erstelle ich' incomingSource' und setze sie in 'sources' hashmap (in' createSource' Methode). Am Ende von 'flow' wird 'incomingSource' zum Graph hinzugefügt. In 'processLttr' nehme ich bereits materialisierte Quelle von hashmap. Zumindest erwarte ich, dass ich es mache. So. Wo liege ich falsch? – Sasha

+1

Die Quelle, die Sie aus der Hashmap nehmen, ist nicht "bereits materialisiert". Jede Quelle ist unveränderlich und kann frei geteilt werden. Wenn Sie die Quelle ausführen, erhalten Sie ihren materialisierten Wert zurück (den ursprünglichen Wert oder den zugeordneten materialisierten Wert). Und Sie können eine Quelle so oft ausführen, wie Sie möchten. –

0

Danke an Stefano!

Aber es scheint, es gibt keinen Weg zu erreichen, was ich mit so wollte. Aber ich grub tiefer und benutzte custom stream processing and integration with actors. Mit dieser Technik kann ich Nachrichten von außen an den bestimmten Strom senden. (Diese Funktion ist noch experimentell!)