Was ist eine beste (saubere) Möglichkeit, eine asynchrone Observable
abzumelden, die in einer flatMap
einer anderen asynchronen Version erzeugt wird? Angenommen, ich habe eine Präsentation, die eine Chat-Liste für den Benutzer anzeigt. Jedes Chat-Element sollte die letzte Nachricht von sich selbst präsentieren. Hier ist ein Original-Code zu veranschaulichen:RxJava: Async-Observable innerhalb einer anderen Async-Produktion abbestellen
fun AsyncInsideAsync() {
/**
* A chat that 'transmits' latest message
*/
class Chat(val name: Int) {
val lastMessage: PublishSubject<String> = PublishSubject.create()
fun postMessage(message: Int) {
lastMessage.onNext("Chat: $name, Message: $message")
}
}
// List of chats for user.
val chats: PublishSubject<Chat> = PublishSubject.create()
///////////////////////////////////
// ORIGINAL SEQUENCE //
///////////////////////////////////
val sequence = chats.flatMap { it.lastMessage }
val subscriber = TestSubscriber<String>()
sequence.subscribe(subscriber)
// User has single chat in a chat-list
val chat1 = Chat(1)
chats.onNext(chat1)
// Someone posts a message in Chat 1
chat1.postMessage(1)
subscriber.assertValues(
"Chat: 1, Message: 1"
)
// Someone posts another message
chat1.postMessage(2)
subscriber.assertValues(
"Chat: 1, Message: 1",
"Chat: 1, Message: 2"
)
// Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created
val chat2 = Chat(2)
chats.onNext(chat2)
// Someone posts a message to Chat 2
chat2.postMessage(1)
subscriber.assertValues(
"Chat: 1, Message: 1",
"Chat: 1, Message: 2",
"Chat: 2, Message: 1"
)
// Someone out there posts a message to Chat 1 that is not visible to user anymore
chat1.postMessage(3)
// The answer looks like this
// "Chat: 1, Message: 1",
// "Chat: 1, Message: 2",
// "Chat: 2, Message: 1",
// "Chat: 1, Message: 3"
// Chat 1 is still subscribed and test fails
subscriber.assertValues(
"Chat: 1, Message: 1",
"Chat: 1, Message: 2",
"Chat: 2, Message: 1",
)
}
Was ich kam zu einem Thema (oder geteilt beobachtbar) zu bremsen, eine Kette von internen Abonnements. Aber es sieht komisch aus:
///////////////////////////////////
// MODIFIED SEQUENCE //
///////////////////////////////////
val unsubscribe: PublishSubject<Boolean> = PublishSubject.create()
val sequence = chats
.doOnNext({ unsubscribe.onNext(true) })
.doAfterTerminate({ unsubscribe.onNext(true) })
.flatMap {
it.lastMessage.takeUntil(unsubscribe)
}
Dieser Ansatz funktioniert, sieht aber beängstigend. Vielen Dank!
Aus irgendeinem Grund denke ich nicht, dass dies eine gute Übereinstimmung für Rx ist; vielleicht würden Sie es besser machen, wenn Sie Ihren Anwendungsfall mit einem Event-Bus modellieren würden? –