2016-08-24 1 views
15

Es ist möglich, Quellen und Senken von Aktoren mit den Methoden Source.actorPublisher() und Sink.actorSubscriber() zu erstellen. Aber ist es möglich, eine Flow von Schauspieler zu erstellen?Erstellen eines Datenflusses vom Aktor in Akka Streams

Konzeptionell es scheint nicht zu einem guten Grunde, nicht zu, da zu sein, dass es sowohl ActorPublisher und ActorSubscriber Züge implementiert, aber leider das Flow Objekt hat keine Methode, dies zu tun. In this exzellenten Blog-Post es ist in einer früheren Version von Akka Streams gemacht, so ist die Frage, ob es auch in der neuesten Version (2.4.9) möglich ist.

+0

Hmmm ... Ich würde vorschlagen, es zu versuchen und wenn es nicht funktioniert, dann aktualisieren yo Deine Frage. – hveiga

+0

Es gibt keine Möglichkeit, es zu tun. Vielleicht war ich nicht klar, aber ein kurzer Blick in die Methoden des Flow-Objekts zeigt, dass es keine solche Methode gibt. Meine Frage ist, ob es in einer anderen Form/API existiert. Danke –

Antwort

28

Ich bin Teil des Akka-Teams und möchte diese Frage nutzen, um ein paar Dinge über die rohen Reactive Streams-Schnittstellen zu klären. Ich hoffe, Sie finden das nützlich.

Vor allem werden wir im Blog des Akka-Teams mehrere Posts über das Erstellen von benutzerdefinierten Bühnen, einschließlich Flows, bald veröffentlichen, also behaltet es im Auge.

Sie ActorPublisher/ActorSubscriber nicht

Bitte verwenden Sie ActorPublisher und ActorSubscriber nicht verwenden. Sie sind zu niedrig und Sie könnten sie so implementieren, dass sie die Reactive Streams specification verletzen. Sie sind ein Relikt der Vergangenheit und selbst dann waren sie nur "Power-User-Modus". Es gibt wirklich keinen Grund, diese Klassen heutzutage zu benutzen. Wir haben nie eine Möglichkeit geschaffen, einen Flow zu erstellen, weil die Komplexität einfach explosiv ist, wenn sie als "rohe" Actor-API für Sie bereitgestellt wurde, um sie zu implementieren und all the rules implemented correctly zu erhalten.

Wenn Sie wirklich rohe ReactiveStreams-Schnittstellen implementieren möchten, verwenden Sie bitte die Specification's TCK, um zu überprüfen, ob Ihre Implementierung korrekt ist. Sie werden wahrscheinlich von einigen der komplexeren Fälle überrascht sein, die eine Flow (oder in der RS-Terminologie eine Processor hat) zu behandeln.

Die meisten Operationen sind möglich

Viele fließt, ohne dabei auf niedriger Ebene aufzubauen, sollten Sie in der Lage sein, einfach durch den Bau von einem Flow[T] Aufbau und das Hinzufügen der benötigten Operationen auf sie, nur als Beispiel:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt) 

Was ist eine wiederverwendbare Beschreibung des Flow.

Da Sie nach Power-User-Modus fragen, ist dies der mächtigste Betreiber auf dem DSL selbst: statefulFlatMapConcat. Die überwiegende Mehrheit der Operationen, die mit einfachen Stream-Elementen arbeiten, sind ausdrückbar unter Verwendung derselben: .

Wenn Sie Timer Sie könnten zip mit einem Source.timer usw.

GraphStage ist die einfachste und sicherste API benutzerdefinierte zu bauen Stufen

Stattdessen baut Quellen/Flow/Sinks hat seine eigene leistungsstarke und sichere API: die GraphStage. Bitte lesen Sie die documentation about building custom GraphStages (sie können eine Sink/Source/Flow oder sogar beliebige beliebige Form sein).Es behandelt alle komplexen Reactive Streams-Regeln für Sie und bietet Ihnen gleichzeitig volle Freiheit und Typsicherheit bei der Implementierung Ihrer Phasen (was ein Flow sein könnte).

Zum Beispiel aus der Dokumentation genommen, ist eine GraphStag Umsetzung des filter(T => Boolean) Betreibers:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { 

    val in = Inlet[A]("Filter.in") 
    val out = Outlet[A]("Filter.out") 

    val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) push(out, elem) 
      else pull(in) 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
} 

Es behandelt auch asynchrone Kanäle und ist schmelzbaren standardmäßig.

auf die docs Darüber hinaus erläutern diese Blog-Posts im Detail, warum das API der heilige Gral der Erstellung benutzerdefinierter Stufen in beliebiger Form ist:

+0

Ist die Empfehlung_ "nicht ActorPublisher und ActorSubscriber. .. Verletzung der Reactive Streams-Spezifikation" _ soll für _ "einen Flow von Actor erstellen?" _ Oder auch für das Erstellen einer Quelle von einem Actor gelten. Weil die Akteure eine natürliche Möglichkeit sind, Quellen zu erstellen: http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html#actorPublisher. Dennoch, wundern Sie sich, wenn die Out-of-the-Box-Implementierung für das ist schlau, um den Schauspieler nicht zu überwältigen ?. Siehe unten für "Akteur enthält einen Puffer". – SemanticBeeng

+1

Verwenden Sie einfach eine GraphStage, um Ihre Quelle zu implementieren, sie wird schneller und leistungsfähiger ;-) Die Verknüpfung mit der einzigen Methode, die Akka Streams direkt zu überbrücken hat, ist ein wenig irreführend, da wir eine ganze Seite haben, auf der erklärt wird, wie man benutzerdefinierte Stufen implementiert : http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html Wie ich bereits erwähnt habe, wird die Einhaltung von GraphStage viele Vorteile bieten: Leistung, Schmelzbarkeit, Debuggability usw. Wenn Sie habe schon einen Verlag, du kannst ihn aber nur mit Akka benutzen, klar. –

12

K onrads Lösung demonstriert, wie man eine benutzerdefinierte Bühne erstellt, die Actors verwendet, aber in den meisten Fällen denke ich, dass das etwas übertrieben ist.

Normalerweise haben Sie einige Schauspieler, die in der Antwort auf Fragen der Lage ist:

val actorRef : ActorRef = ??? 

type Input = ??? 
type Output = ??? 

val queryActor : Input => Future[Output] = 
    (actorRef ? _) andThen (_.mapTo[Output]) 

Dies kann leicht mit grundlegenden Flow Funktionalität verwendet werden, die in der maximalen Anzahl gleichzeitiger Anfragen nehmen:

val actorQueryFlow : Int => Flow[Input, Output, _] = 
    (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor) 

Jetzt actorFlow kann in jeden Strom integriert werden ...

+3

Ich stimme wirklich zu, sollte Zeit finden, meine Antwort zu ändern ... fühlen Sie sich frei, es zu bearbeiten, wenn Sie die Zeit haben! Beide Wege sollten erklärt werden, Ihre als die empfohlene –

+2

@ Konrad'ktoso'Malawski Ich weiß es zu schätzen, meine Antwort zu bestätigen. Danke auch für die Arbeit an akka. Ihr macht ein paar wirklich coole Sachen. –

Verwandte Themen