2017-10-30 2 views
0

Ich habe einen Netzwerk-Client, der von Unterbrechungen fortsetzen kann, aber die letzte Nachricht benötigt, um dies zu tun, wenn es einen erneuten Versuch gibt.So erinnern Sie sich an Status mit Wiederholungsoperatoren in RxJava2

Beispiel in Kotlin:

fun requestOrResume(last: Message? = null): Flowable<Message> = 
    Flowable.create({ emitter -> 
     val connection = if (last != null) 
          client.start() 
         else 
          client.resumeFrom(last.id) 

     while (!emitter.isDisposed) { 
      val msg = connection.nextMessage() 
      emitter.onNext(msg) 
     } 
    }, BackpressureStrategy.MISSING) 

requestOrResume() 
    .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } 
    // how to pass the resume data when there is a retry? 

Frage: wie Sie sehen können, muss ich die letzte empfangene Nachricht, um den Lebenslauf Anruf vorzubereiten. Wie kann ich den Überblick behalten, damit bei einem erneuten Versuch der Antrag auf eine Fortsetzen-Anfrage gestellt wird?

Eine mögliche Lösung besteht darin, eine Halterklasse zu erstellen, die nur einen Verweis auf die letzte Nachricht enthält und aktualisiert wird, wenn eine neue Nachricht empfangen wird. Auf diese Weise kann beim letzten Versuch die letzte Nachricht vom Inhaber erhalten werden. Beispiel:

class MsgHolder(var last: Message? = null) 

fun request(): Flowable<Message> { 
    val holder = MsgHolder() 
    return Flowable.create({ emitter -> 
     val connection = if (holder.last != null) 
          client.start() 
         else 
          client.resumeFrom(holder.last.id) 

     while (!emitter.isDisposed) { 
      val msg = connection.nextMessage() 
      holder.last = msg // <-- update holder reference 
      emitter.onNext(msg) 
     } 
    }, BackpressureStrategy.MISSING) 
} 

Ich denke, das könnte funktionieren, aber es fühlt sich an wie ein Hack (Thread-Synchronisierungsprobleme?).

Gibt es eine bessere Möglichkeit, den Status zu verfolgen, damit er für Wiederholungen verfügbar ist?

Antwort

1

Beachten Sie, dass, es sei denn, Sie rethrow ein Wrapper um dein letztes Element (nicht zu funktionell anders als deine existierende "hack" -ische Lösung, aber viel hässlicher imo), keine Fehlerbehandlungsoperatoren können das letzte Element ohne Hilfe von außen wiederherstellen, weil sie nur Zugriff auf Streams von Throwable. Stattdessen sehen, ob der folgende rekursive Ansatz für Ihre Anforderungen:

fun retryWithLast(seed: Flowable<Message>): Flowable<Message> { 
    val last$ = seed.last().cache(); 
    return seed.onErrorResumeNext { 
    it.flatMap { 
     retryWithLast(last$.flatMap { 
     requestOrResume(it) 
     }) 
    } 
    }; 
} 
retryWithLast(requestOrResume()); 

Der größte Unterschied ist Caching des nacheilenden Wertes aus dem letzten Versuch in einer beobachtbaren mit cache anstatt in einem Wert so manuell tun. Beachten Sie auch, dass die Rekursion in der Fehlerbehandlungsroutine retryWithLast weiterhin den Stream erweitert, wenn nachfolgende Versuche weiterhin fehlschlagen.

0

Werfen Sie einen Blick auf buffer() Betreiber: link Sie es wie folgt verwenden:

requestOrResume() 
    .buffer(2) 

Ab jetzt Ihre Flowable wird List<Message> mit zwei latests Rück Objekte

+0

Ich sehe nicht, wie der "Puffer" -Operator in dieser Situation hilft. – ESala

Verwandte Themen