2014-11-20 12 views
5

Ich habe eine Observable als Ergebnis der Transformation von BehaviourSubject mit vielen Funktionen erstellt. Jetzt möchte ich die Werte dieses Observable teilen, damit die Kette der Funktionen nicht für jeden neuen Teilnehmer neu ausgeführt würde. Außerdem möchte ich, dass sich die gemeinsam genutzte Kopie genauso verhält wie das Original, d. H. Der neu angekommene Teilnehmer sollte den letzten emittierten Wert unmittelbar nach der Anmeldung erhalten.RxJava Anteil mit dem letzten Wert für neue Abonnenten

In 0.20.x war es möglich, multicast(subjectFactory).refCount() mit Fabrik von BehaviourSubject ‚s zu verwenden, oder einfach share(initialValue) verwenden, was wiederum BehaviourSubjectPublishSubject statt verwendet.

Wie erreiche ich das gleiche Verhalten in 1.0.x?

Antwort

7

Ich denke, Sie können multicast(behaviorSubjectFactory).refCount() durch replay(1).refCount() ersetzen.

Um die Diskussion etwas konkreter zu machen, hier ist ein komplettes Beispiel (in Scala):

@volatile var startTime: Long = 0 
def printTimestamped(s: String) { 
    println(s"[t=${System.currentTimeMillis-startTime}] $s") 
} 

// Suppose for simplicity that the UI Events are just ticks of a 
// hot timer observable. 
val uiEvents = Observable.timer(1000 millis, 1000 millis) 
     .doOnEach(i => printTimestamped("producing " + i)) 
     .publish 

// Now apply all the transformations 
val transformed = uiEvents.map(i => i + 101) 
     .doOnEach(i => printTimestamped("transformed to " + i)) 

// And set a default start value 
val o1 = transformed.startWith(100) 

// Share and make sure new subscribers get the last element replayed 
// immediately after they subscribe: 
val o2 = o1.replay(1).refCount 

// startTime is just before we start the uiEvents observable 
startTime = System.currentTimeMillis 
val subscriptionUiEvents = uiEvents.connect 

Thread.sleep(500) 

printTimestamped("subscribing A") 
val subscriptionA = o2.subscribe(i => printTimestamped("A got " + i)) 

Thread.sleep(2000) 

printTimestamped("subscribing B") 
val subscriptionB = o2.subscribe(i => printTimestamped("B got " + i)) 

Thread.sleep(2000) 

printTimestamped("unsubscribing B") 
subscriptionB.unsubscribe() 

Thread.sleep(2000) 

printTimestamped("unsubscribing A") 
subscriptionA.unsubscribe() 

// Now the transformations will stop being executed, but the UI 
// events will still be produced 

Thread.sleep(2000) 

// Finally, also stop the UI events: 
subscriptionUiEvents.unsubscribe() 

Ausgang:

[t=505] subscribing A 
[t=519] A got 100 
[t=1002] producing 0 
[t=1003] transformed to 101 
[t=1003] A got 101 
[t=2002] producing 1 
[t=2002] transformed to 102 
[t=2002] A got 102 
[t=2520] subscribing B 
[t=2521] B got 102 
[t=3003] producing 2 
[t=3003] transformed to 103 
[t=3003] A got 103 
[t=3003] B got 103 
[t=4002] producing 3 
[t=4002] transformed to 104 
[t=4002] A got 104 
[t=4002] B got 104 
[t=4521] unsubscribing B 
[t=5003] producing 4 
[t=5003] transformed to 105 
[t=5003] A got 105 
[t=6002] producing 5 
[t=6002] transformed to 106 
[t=6002] A got 106 
[t=6522] unsubscribing A 
[t=7003] producing 6 
[t=8002] producing 7 

Ursprüngliche Antwort:

Zitieren der release notes for 1.0.0:

Es wurde jede Methodenüberladung entfernt, die einen Anfangswert angenommen hat, da der startWith-Operator dies bereits generisch erlaubt.

Verwenden Sie statt share(initialValue) einfach share().startWith(initialValue).

+0

Vielen Dank für Ihre Antwort! Aber diese Konstruktion verhält sich nicht wie gewünscht. Grundsätzlich möchte ich RX in der UI-Programmierung verwenden, um den "aktuellen Zustand" zu umgehen, so dass jeder neue Teilnehmer sofort den aktuellen Status empfangen und auf weitere Benachrichtigungen warten sollte. Was Sie vorschlagen, ist, Benachrichtigungen mit einem gewissen Wert voranzustellen, der im Voraus berechnet wird und nicht den "aktuellen Status" widerspiegelt. – avakhrenev

+0

Ich sehe. Meine Antwort wurde aktualisiert. –

+0

Verstanden! Das ist genau das, was ich brauche, danke. – avakhrenev

Verwandte Themen