2017-02-03 2 views
1

Ich verwende RxJava, um einen Hintergrundjob zu erstellen, der meine Datenbank syncronisiert.
Es verbindet sich mit einer externen Quelle und fängt an, Einträge zu verarbeiten, sie zuzuordnen und in die Datenbank einzufügen.
Wenn es endet, brauche ich die Liste mit allen verarbeiteten Elementen, ich kann es bekommen, wenn alles richtig geht, aber wie kann ich alle verarbeiteten Elemente sammeln, wenn während des Ablaufs etwas fehlschlägt?ReactiveX sammelt Elemente, die vor einem Fehler verarbeitet wurden

final List<String> res = Observable.create(onSubscribe) 
     .buffer(4) 
     .flatMap(TestRx::doStuff) 
     .buffer(8) 
     .map(TestRx::calculateList) 
     .toList() 
     .toBlocking() 
     .single(); 
System.out.println("strings = " + res); 

Was würde ich es eine Möglichkeit haben möchte, dass, wenn doStuff oder calculateList throw Ausnahmen, die Strömung aN gibt die Liste mit allem, was es, bis der Fehler weiter abgearbeitet.

Antwort

2
List<String> res = Observable.create(onSubscribe) 
    .buffer(4) 
    .flatMap(TestRx::doStuff) 
    .onErrorResumeNext(Observable.empty()) // turn error into completion 
    .buffer(8) 
    .map(TestRx::calculateList) 
    .onErrorResumeNext(Observable.empty()) // turn error into completion 
    .toList() 
    .toBlocking() 
    .single(); 
System.out.println("strings = " + res); 
+0

auf Fehler Lebenslauf weiterhin den Fluss auszuführen, ich brauche es auf Fehler – rascio

+1

@rascio Ursprüngliche Fluss stoppt auf Fehler zu stoppen, setzt sich die Verarbeitung mit neuen Strömung, die Sie mit 'onErrorResumeNext' schaffen, die' empty' Fluss , dh Sie schließen den Fluss sofort ab. –

Verwandte Themen