2015-06-11 12 views
26

Ich versuche, die Source.actorRef Methode zu verwenden, um ein akka.stream.scaladsl.Source Objekt zu erstellen. Etwas von der FormZugriff auf den zugrunde liegenden ActorRef eines akka stream Quelle erstellt von Source.actorRef

import akka.stream.OverflowStrategy.fail 
import akka.stream.scaladsl.Source 

case class Weather(zip : String, temp : Double, raining : Boolean) 

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) 

val sunnySource = weatherSource.filter(!_.raining) 
... 

Meine Frage ist: , wie sende ich Daten zu meinem ActorRef basierten Quellobjekt?

nahm ich Nachrichten an die Quelle wurde etwas von der Form

//does not compile 
weatherSource ! Weather("90210", 72.0, false) 
weatherSource ! Weather("02139", 32.0, true) 

Aber weatherSource verfügt nicht über eine ! Operator oder tell Methode sendet.

Die documentation ist nicht zu beschreibende, wie Source.actorRef verwenden, heißt es können Sie ...

Dank für Ihre Bewertung und Antwort Vielen Dank im Voraus.

Antwort

22

Sie benötigen ein Flow:

import akka.stream.OverflowStrategy.fail 
    import akka.stream.scaladsl.Source 
    import akka.stream.scaladsl.{Sink, Flow} 

    case class Weather(zip : String, temp : Double, raining : Boolean) 

    val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) 

    val sunnySource = weatherSource.filter(!_.raining) 

    val ref = Flow[Weather] 
    .to(Sink.ignore) 
    .runWith(sunnySource) 

    ref ! Weather("02139", 32.0, true) 

Denken Sie daran, das alles ist experimentell und können sich ändern!

+0

In M5 sieht es aus wie Source.actorRef nicht existiert. Weißt du, wo es hingezogen ist? –

+0

Es sieht so aus, als hätten sie das geändert, indem sie ein 'Props' an die Quelle übergaben. Die aktualisierte Dokumentation ist hier http://doc.akka.io/docs/akka-stream-und-http-experimental/1.0-M5/scala/stream-integrations.html – Noah

+0

1.0-RC3 ist die neueste Version und 'Source .actorRef' lebt noch immer am selben Ort: http://doc.akka.io/api/akka-stream-und-http-experimental/1.0-RC3/#akka.stream.scaladsl.Quelle – jrudolph

5

Da @Noah auf die experimentelle Natur von akka-streams hinweist, könnte seine Antwort mit der Version 1.0 nicht funktionieren. Ich hatte das Beispiel von this example zu befolgen:

implicit val materializer = ActorMaterializer() 
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run() 
actorRef ! TweetInfo(...) 
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher) 
4

Instanz von ActorRef, wie alle ‚Werte materialisiert‘, wird nur einmal ganze Strom zugänglich geworden ist, materialisiert, oder, mit anderen Worten, wenn RunnableGraph abgearbeitet wird.

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph 
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println)) 

// You get ActorRef instance as a materialized value 
val actorRef1: ActorRef = rg1.run() 

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done: 

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple 
// (ActorRef, Future[Done]) when you run the graph 
val rg2: RunnableGraph[(ActorRef, Future[Done])] = 
    sunnySource.toMat(Sink.foreach(println))(Keep.both) 

// You get both ActorRef and Future[Done] instances as materialized values 
val (actorRef2, future) = rg2.run() 

actorRef2 ! Weather("90210", 72.0, false) 
actorRef2 ! Weather("02139", 32.0, true) 
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream 
future onComplete { /* ... */ } 
+0

Erhalte sowohl ActorRef als auch Zukunft bis zur Fertigstellung - unglaublich! Danke! – AlonL

Verwandte Themen