2017-03-27 2 views
0

Ich bin ein RxJava Newcomer, und ich habe einige Schwierigkeiten, meinen Kopf darum zu wickeln, wie man das Folgende macht.kann ich einen Single mit einem Observable "konditionieren"?

  • i Retrofit bin mit einer Netzwerkanfrage aufzurufen, die mich gibt einen Single<Foo>, die der Typ i schließlich über meine Subscriber Instanz konsumieren wollen (nennen wir es SingleFooSubscriber)
  • Foo eine interne Eigenschaft hat items getippt als List<String>.
  • Wenn Foo.items nicht leer ist, möchte ich separate, gleichzeitige Netzwerkanforderungen für jeden seiner Werte aufrufen. (Die tatsächlichen Ergebnisse dieser Anfragen sind für SingleFooSubscriber unbedeutend, da die Ergebnisse extern zwischengespeichert werden).
  • SingleFooSubscriber.onComplete() sollte nur aufgerufen werden, wenn Foo und alle Foo.items abgerufen wurden.

fetchFooCall .subscribeOn (Schedulers.io())

// Approach #1... 
// the idea here would be to "merge" the results of both streams into a single 
// reactive type, but i'm not sure how this would work given that the item emissions 
// could be far greater than one. using zip here i don't think it would every 
// complete. 

.flatMap { foo -> 
    if(foo.items.isNotEmpty()) { 
     Observable.zip(
       Observable.fromIterable(foo.items), 
       Observable.just(foo), 
       { source1, source2 -> 
        // hmmmm... 
       } 
     ).toSingle() 

    } else { 
     Single.just(foo) 
    } 
} 

// ...or Approach #2... 
// i think this would result in the streams for Foo and items being handled sequentially, 
// which is not really ideal because 
// 1) i think it would entail nested streams (i get the feeling i should be using flatMap 
// instead) 
// 2) and i'm not sure SingleFooSubscriber.onComplete() would depend on the completion of 
// the stream for items 

.doOnSuccess { data -> 
    if(data.items.isNotEmpty()) { 
     // hmmmm... 
    } 
} 

.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(
    { data -> /* onSuccess() */ }, 
    { error -> /* onError() */ } 
) 

keine Gedanken darüber, wie man diesen Ansatz wäre sehr dankbar!

Bonuspunkte: Bei dem Versuch, eine Lösung zu diesem Thema zu finden, habe ich begonnen, die Entscheidung, den Single reaktiven Typ vs den Observable reaktiven Typ zu verwenden, in Frage zu stellen. die meisten (alle, außer diesem einen Fall?) meiner Ströme drehen sich eigentlich darum, eine einzelne Instanz von etwas zu konsumieren, also lehnte ich mich an Single, um meine Streams darzustellen, da ich dachte, dass sie etwas semantische Klarheit um den Code hinzufügen würde. Hat jemand eine allgemeine Anleitung, wann man das eine gegen das andere benutzen soll?

Antwort

2

Sie müssen Nest flatMap s und dann konvertieren zurück zu Single:

retrofit.getMainObject() 
    .flatMap(v -> 
    Flowable.fromIterable(v.items) 
     .flatMap(w -> 
     retrofit.getItem(w.id).doOnNext(x -> w.property = x) 
     ) 
     .ignoreElements() 
     .toSingle(v) 
) 
+0

noch nicht gesehen/verwendet 'ignoreElements()'. Wenn dies aktiviert ist, ist garantiert, dass jedes Element bis zum Abschluss "verarbeitet" wurde, bevor der onComplete-Listener aufgerufen wird? Mit anderen Worten, in meinem speziellen Fall frage ich, ob der Fernaufruf für jedes "Element" abgeschlossen sein wird, oder könnten sie möglicherweise noch zu dem Zeitpunkt in Betrieb sein, zu dem der onComplete-Listener aufgerufen wird? – hsl

+0

IgnoreElements ignoriert * alle * Elemente und beendet den Abschluss der Quelle. Das bedeutet keine In-Flight-Elemente. – akarnokd

Verwandte Themen