2016-03-23 11 views
1

Was ist eine beste (saubere) Möglichkeit, eine asynchrone Observable abzumelden, die in einer flatMap einer anderen asynchronen Version erzeugt wird? Angenommen, ich habe eine Präsentation, die eine Chat-Liste für den Benutzer anzeigt. Jedes Chat-Element sollte die letzte Nachricht von sich selbst präsentieren. Hier ist ein Original-Code zu veranschaulichen:RxJava: Async-Observable innerhalb einer anderen Async-Produktion abbestellen

fun AsyncInsideAsync() { 
    /** 
    * A chat that 'transmits' latest message 
    */ 
    class Chat(val name: Int) { 
     val lastMessage: PublishSubject<String> = PublishSubject.create() 

     fun postMessage(message: Int) { 
      lastMessage.onNext("Chat: $name, Message: $message") 
     } 
    } 

    // List of chats for user. 
    val chats: PublishSubject<Chat> = PublishSubject.create() 

    /////////////////////////////////// 
    //  ORIGINAL SEQUENCE  // 
    /////////////////////////////////// 
    val sequence = chats.flatMap { it.lastMessage } 

    val subscriber = TestSubscriber<String>() 
    sequence.subscribe(subscriber) 

    // User has single chat in a chat-list 
    val chat1 = Chat(1) 
    chats.onNext(chat1) 

    // Someone posts a message in Chat 1 
    chat1.postMessage(1) 
    subscriber.assertValues(
      "Chat: 1, Message: 1" 
    ) 

    // Someone posts another message 
    chat1.postMessage(2) 
    subscriber.assertValues(
      "Chat: 1, Message: 1", 
      "Chat: 1, Message: 2" 
    ) 

    // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created 
    val chat2 = Chat(2) 
    chats.onNext(chat2) 

    // Someone posts a message to Chat 2 
    chat2.postMessage(1) 
    subscriber.assertValues(
      "Chat: 1, Message: 1", 
      "Chat: 1, Message: 2", 
      "Chat: 2, Message: 1" 
    ) 

    // Someone out there posts a message to Chat 1 that is not visible to user anymore 
    chat1.postMessage(3) 

    // The answer looks like this 
    //  "Chat: 1, Message: 1", 
    //  "Chat: 1, Message: 2", 
    //  "Chat: 2, Message: 1", 
    //  "Chat: 1, Message: 3" 
    // Chat 1 is still subscribed and test fails 
    subscriber.assertValues(
      "Chat: 1, Message: 1", 
      "Chat: 1, Message: 2", 
      "Chat: 2, Message: 1", 
    ) 
} 

Was ich kam zu einem Thema (oder geteilt beobachtbar) zu bremsen, eine Kette von internen Abonnements. Aber es sieht komisch aus:

/////////////////////////////////// 
    //  MODIFIED SEQUENCE  // 
    /////////////////////////////////// 
    val unsubscribe: PublishSubject<Boolean> = PublishSubject.create() 
    val sequence = chats 
      .doOnNext({ unsubscribe.onNext(true) }) 
      .doAfterTerminate({ unsubscribe.onNext(true) }) 
      .flatMap { 
       it.lastMessage.takeUntil(unsubscribe) 
      } 

Dieser Ansatz funktioniert, sieht aber beängstigend. Vielen Dank!

+0

Aus irgendeinem Grund denke ich nicht, dass dies eine gute Übereinstimmung für Rx ist; vielleicht würden Sie es besser machen, wenn Sie Ihren Anwendungsfall mit einem Event-Bus modellieren würden? –

Antwort

2

Unter der Annahme, dass der Kommentar

// Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created 

zeigt an, dass Sie die aktuellen Chat mögen (chat1 in diesem Fall) emittieren zu stoppen, wenn ein neues Chat auf chats gesendet wird, können Sie dies mit dem switchMap Operator erreichen können .

val sequence = chats.switchMap { it.lastMessage } 

Von der ReactiveX Dokumentation:

RxJava implementiert auch die switchMap Operator. Es verhält sich ähnlich wie flatMap, außer dass, wenn ein neues Element von der Quelle Observable emittiert wird, wird es die Observable , die aus dem zuvor emittierten Element generiert wurde, abmelden und aufhören zu spiegeln, und beginnen nur Spiegelung der aktuellen.

+0

Vielen Dank! Das war offensichtlich :) – Motorro

Verwandte Themen