2017-02-08 3 views
0

Ich versuche, einen reactivetream-Abonnenten an eine akka-Quelle anzuhängen.Kann reactivestream Subscriber nicht mit akka stream sources verwenden

Meine Quelle scheint mit einer einfachen Senke (wie eine foreach) gut zu funktionieren - aber wenn ich eine echte Senke einfüge, die von einem Teilnehmer gemacht wird, bekomme ich nichts.

Mein Kontext ist:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import org.reactivestreams.{Subscriber, Subscription} 

implicit val system = ActorSystem.create("test") 
implicit val materializer = ActorMaterializer.create(system) 

class PrintSubscriber extends Subscriber[String] { 
    override def onError(t: Throwable): Unit = {} 
    override def onSubscribe(s: Subscription): Unit = {} 
    override def onComplete(): Unit = {} 
    override def onNext(t: String): Unit = { 
    println(t) 
    } 
} 

und mein Testfall ist:

val subscriber = new PrintSubscriber() 
val sink = Sink.fromSubscriber(subscriber) 

val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc")) 
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz")) 
source1.to(sink).run()(materializer) 
source2.runForeach(println) 

ich Ausgang:

aaa 
bbb 
ccc 

Warum nicht ich xxx, yyy und zzz?

Antwort

2

die Reactive Streams Spezifikationen für das Subscriber unter zitierend:

Wird einmal Aufruf onSubscribe (Abonnement) erhalten, nachdem eine Instanz von Abonnenten Publisher.subscribe (Subscriber) vorbei. Es werden keine weiteren Benachrichtigungen empfangen, bis Subscription.request (long) aufgerufen wird.

Die kleinste Änderung, die Sie einige Einzelteile zu sehen, fließen durch zu Ihrem Teilnehmer ist

override def onSubscribe(s: Subscription): Unit = { 
    s.request(3) 
} 

jedoch im Auge behalten, machen dies wird es nicht vollständig zu dem reaktiven Streams specs konform machen. Es ist nicht so einfach zu implementieren, ist der Hauptgrund für höhere Toolkits wie Akka-Streams selbst.

Verwandte Themen