Ich bin Teil des Akka-Teams und möchte diese Frage nutzen, um ein paar Dinge über die rohen Reactive Streams-Schnittstellen zu klären. Ich hoffe, Sie finden das nützlich.
Vor allem werden wir im Blog des Akka-Teams mehrere Posts über das Erstellen von benutzerdefinierten Bühnen, einschließlich Flows, bald veröffentlichen, also behaltet es im Auge.
Sie ActorPublisher/ActorSubscriber nicht
Bitte verwenden Sie ActorPublisher
und ActorSubscriber
nicht verwenden. Sie sind zu niedrig und Sie könnten sie so implementieren, dass sie die Reactive Streams specification verletzen. Sie sind ein Relikt der Vergangenheit und selbst dann waren sie nur "Power-User-Modus". Es gibt wirklich keinen Grund, diese Klassen heutzutage zu benutzen. Wir haben nie eine Möglichkeit geschaffen, einen Flow zu erstellen, weil die Komplexität einfach explosiv ist, wenn sie als "rohe" Actor-API für Sie bereitgestellt wurde, um sie zu implementieren und all the rules implemented correctly zu erhalten.
Wenn Sie wirklich rohe ReactiveStreams-Schnittstellen implementieren möchten, verwenden Sie bitte die Specification's TCK, um zu überprüfen, ob Ihre Implementierung korrekt ist. Sie werden wahrscheinlich von einigen der komplexeren Fälle überrascht sein, die eine Flow
(oder in der RS-Terminologie eine Processor
hat) zu behandeln.
Die meisten Operationen sind möglich
Viele fließt, ohne dabei auf niedriger Ebene aufzubauen, sollten Sie in der Lage sein, einfach durch den Bau von einem Flow[T]
Aufbau und das Hinzufügen der benötigten Operationen auf sie, nur als Beispiel:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
Was ist eine wiederverwendbare Beschreibung des Flow.
Da Sie nach Power-User-Modus fragen, ist dies der mächtigste Betreiber auf dem DSL selbst: statefulFlatMapConcat
. Die überwiegende Mehrheit der Operationen, die mit einfachen Stream-Elementen arbeiten, sind ausdrückbar unter Verwendung derselben: .
Wenn Sie Timer Sie könnten zip
mit einem Source.timer
usw.
GraphStage ist die einfachste und sicherste API benutzerdefinierte zu bauen Stufen
Stattdessen baut Quellen/Flow/Sinks hat seine eigene leistungsstarke und sichere API: die GraphStage
. Bitte lesen Sie die documentation about building custom GraphStages (sie können eine Sink/Source/Flow oder sogar beliebige beliebige Form sein).Es behandelt alle komplexen Reactive Streams-Regeln für Sie und bietet Ihnen gleichzeitig volle Freiheit und Typsicherheit bei der Implementierung Ihrer Phasen (was ein Flow sein könnte).
Zum Beispiel aus der Dokumentation genommen, ist eine GraphStag Umsetzung des filter(T => Boolean)
Betreibers:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Es behandelt auch asynchrone Kanäle und ist schmelzbaren standardmäßig.
auf die docs Darüber hinaus erläutern diese Blog-Posts im Detail, warum das API der heilige Gral der Erstellung benutzerdefinierter Stufen in beliebiger Form ist:
Hmmm ... Ich würde vorschlagen, es zu versuchen und wenn es nicht funktioniert, dann aktualisieren yo Deine Frage. – hveiga
Es gibt keine Möglichkeit, es zu tun. Vielleicht war ich nicht klar, aber ein kurzer Blick in die Methoden des Flow-Objekts zeigt, dass es keine solche Methode gibt. Meine Frage ist, ob es in einer anderen Form/API existiert. Danke –