2017-06-19 14 views
0

Ich habe den folgenden Code, der eine PublishSubject verwendet.Unerwartetes Verhalten mit RxJava2 PublishSubject

val subject = PublishSubject.create<Int>() 

val o1: Observable<String> = 
     subject.observeOn(Schedulers.newThread()).map { i: Int -> 
      println("${Thread.currentThread()} | ${Date()} | map => $i") 
      i.toString() 
     } 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it") 
} 

println("${Thread.currentThread()} | ${Date()} | submitting 1") 

subject.onNext(1) 

1) erstelle ich ein Observable von ihm und wo es sich (für die Zwecke dieses Beispiels ich gerade Umwandlung in eine String) =>o1.

2) Ich abonniere dann o1 3 mal.

3) Schließlich "veröffentliche" ich ein Ereignis durch Aufruf subject.onNext(1).

Zu meiner Überraschung Ich erhalte die folgende Ausgabe:

Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1 

map endet 3 mal aufgerufen wird, und ich verstehe nicht, warum da ich o1 melde mich an, was passieren soll, nachdem map aufgetreten ist. Ich muss etwas verpassen. Jede Hilfe wäre willkommen.

Dank Yan

+0

Sie abonnieren dreimal "o1" und erstellen jeweils eine unabhängige Sequenz bis zum 'PublishSubject', das die' onNext' an alle 3 Ketten sendet. – akarnokd

+0

Sie sagen "bis zum' PublishSubject' ": Warum geht es bis zum Thema? Kannst du mich darauf hinweisen, wo das erklärt würde? Und wenn dies normales Verhalten ist, gibt es eine Möglichkeit, den Stream nach der Map zu transformieren, so dass es das nicht tut? – yan

+0

Da PublishSubject aus Sicht aller 3 Abonnenten eine Multicasting-Quelle ist, die Ereignisse über unabhängige Ketten, die durch die subscription() -Aufrufe festgelegt werden, an sie sendet. – akarnokd

Antwort

1

Aus den Kommentaren:

abonniert o1 dreimal, die jeweils eine unabhängige Folge zu schaffen, bis die PublishSubject bis, die die onNext an alle 3 Ketten versenden wird.

Aus Sicht aller 3 Teilnehmer ist PublishSubject eine Multicasting-Quelle, die Ereignisse über unabhängige Ketten, die durch die subscribe()-Aufrufe festgelegt wurden, an sie sendet.

Anwenden von Operatoren auf eine Subject machen im Allgemeinen nicht die gesamte Kette heiß, da diese Operator-Elemente nur dann an die Quelle Subject angeschlossen werden, wenn sie abonniert sind. Somit ergeben mehrere Abonnements mehrere Kanäle für denselben Upstream Subject.

Verwenden Sie publish, um eine ConnectableObservable (oder eine andere PublishSubject ganz am Ende) zu erhalten, um die Sequenz von diesem Punkt an heiß zu machen.

+0

Ich bestätige, dass durch Hinzufügen von '.publish.autoConnect()' nach 'map' das Richtige gemacht wird.Ich bin nicht 100% sicher, was Sie mit _oder einem anderen 'PublishSubject'_ meinen, aber die Art und Weise, wie ich es geschafft habe, bestand darin, '.subscribe {otherSubject.onNext (it)}}' hinzuzufügen und die Abonnements stattdessen für dieses andere Thema zu verwenden. – yan

+0

Um die Schleife zu schließen, habe ich einen Text erstellt, um die 2 Optionen https://gist.github.com/ypujante/2ab3c3a135272ea4bc4554cbfc287ca7 zu demonstrieren. Am Ende wählte ich die Option mit dem zweiten Thema, da es mir sauberer erscheint. – yan