2017-02-12 9 views
0

Das Problem, das ich gegenüberstelle, ist wie folgt: Ich habe zwei observables, das man Daten vom Netz holt und das andere von db. Die zweite könnte leer sein, aber das Fehlen der ersten wird als Fehler angesehen. Dann, wenn das Ergebnis vom Netzwerk kommt, muss ich es mit den neuesten Ergebnissen von db vergleichen (falls vorhanden) und wenn sie sich unterscheiden, möchte ich sie speichern (wenn das db Observable leer ist, möchte ich sowieso Netzwerkergebnisse speichern).Wie man Observables mit anderen Observables filtert

Gibt es einen dedizierten Operator, der einen Fall wie diesen behandelt?

Bisher habe ich eine Lösung mit zipWith (die nicht wie erwartet funktioniert, wenn db ist leer), Puffer (die funktioniert, aber ist alles andere als ideal), und ich versuchte auch Flatmapping (die zusätzliche Casting erfordert in der Abonnent).

Unten ist die Lösung mit Puffer.

Observable.concat(ratesFromNetwork(), latestRatesFromDB()) 
       .buffer(3000, 2) 
       .filter(buffer -> !(buffer.size() == 2 && !buffer.get(0).differentThan(buffer.get(1)))) 
       .map(buffer -> buffer.get(0)) 
       .subscribe(this::save, 
         (ex) -> System.out.println(ex.getMessage()), 
         () -> System.out.println("completed")); 

Wenn ich latestRatesFromDb ändern, damit es nicht beobachtbar Rückkehr ist aber und optional statt das ganze Problem wird trivial, weil ich dieses Ergebnis filtern verwenden. Es scheint, dass es keine Möglichkeit gibt, auf eine asynchrone Weise zu filtern (oder habe ich etwas verpasst?)

+0

Was emittieren diese Observablen? nur ein Element oder mehrere? – flakes

+0

Nur ein Ereignis. – Luke

Antwort

1

Okay, hier ist, wie ich darüber schreiben würde.

Erstens, was auch immer Klasse hat die differentThan Funktion sollte geändert werden, um equals stattdessen zu überschreiben. Ansonsten können Sie mit diesen Objekten nicht viele grundlegende Methoden verwenden.

Für den Zweck dieses Beispiels schrieb ich alle Observablen mit der Integer Klasse als meine Typparameter. Ich benutze dann einen Scheduler zwei mock Methoden zu schreiben:

static Observable<Integer> ratesFromNetwork(Scheduler scheduler) { 
    return Observable.<Integer>create(sub -> { 
     sub.onNext(2); 
     sub.onCompleted(); 
    }).delay(99, TimeUnit.MILLISECONDS, scheduler); 
} 

static Observable<Integer> latestRatesFromDB(Scheduler scheduler) { 
    return Observable.<Integer>create(sub -> { 
     sub.onNext(1); 
     sub.onCompleted(); 
    }).delay(99, TimeUnit.MILLISECONDS, scheduler); 
} 

Wie Sie beide sind ähnlich sehen können, werden sie jedoch unterschiedliche Werte emittieren.

Mangel an dem ersten wird ein Fehler betrachtet

Der beste Weg, dies zu erreichen, ist ein timeout zu verwenden. Sie können das Fehlerprotokoll sofort hier und weiter:

final Observable<Integer> networkRate = ratesFromNetwork(scheduler) 
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from network.")); 

Wenn die timeout ein Fehler nicht durch rx geworfen werden. doOnError gibt Ihnen eine bessere Vorstellung davon, wo dieser Fehler begann und ließ ihn durch den Rest der Folge propagieren.

Die zweite könnte

In diesem Fall leer sein ich eine ähnliche Strategie würde tun, lassen jedoch nicht der Fehler propagieren onErrorResumeNext die Methode unter Verwendung. Jetzt können Sie sicherstellen, dass das Observable mindestens einen Wert ausgibt, indem Sie firstOrDefault verwenden. Verwenden Sie in dieser Methode einen Dummy-Wert, der erwartungsgemäß nie mit den Netzwerkergebnissen übereinstimmt.

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler) 
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from database")) 
    .onErrorResumeNext(Observable.empty()) 
    .firstOrDefault(-1); 

nun durch die distinct Methode verwenden, können Sie nur einen Wert greifen, wenn es anders als die, die vor ihm da war (das ist, warum Sie equals außer Kraft setzen müssen).

databaseRate.concatWith(networkRate).distinct().skip(1) 
    .subscribe(i -> System.out.println("Updating to " + i), 
     System.err::println, 
     () -> System.out.println("completed")); 

Hier wird die Datenbank Rate vor dem Netzrate getätigt Vorteil distinct zu nehmen. a skip wird dann hinzugefügt, um den Wert der Datenbankrate immer zu ignorieren.


komplette Code:

final long networkTimeOut = 100; 
final long databaseTimeOut = 100; 

final TestScheduler scheduler = new TestScheduler(); 

final Observable<Integer> networkRate = ratesFromNetwork(scheduler) 
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from network.")); 

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler) 
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from database")) 
    .onErrorResumeNext(Observable.empty()) 
    .firstOrDefault(-1); 

databaseRate.concatWith(networkRate).distinct().skip(1) 
    .subscribe(i -> System.out.println("Updating to " + i), 
     System.err::println, 
     () -> System.out.println("completed")); 

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); 

Wenn networkTimeOut und databaseTimeOut sind größer als 100 druckt:

Updating to 2 
completed 

Wenn networkTimeOut weniger als 100 druckt:

Failed to get rates from network. 
java.util.concurrent.TimeoutException 

Wenn databaseTimeOut weniger als 100 ist es druckt:

Failed to get rates from database 
Updating to 2 
completed 

Und wenn Sie latestRatesFromDB und ratesFromNetwork ändern den gleichen Wert zurück, es einfach druckt:

completed 

Und wenn Sie keine Timeouts oder Logging erzwingen wollen, dann kocht es auf:

latestRatesFromDB().firstOrDefault(dummyValue) 
    .concatWith(ratesFromNetwork()) 
    .distinct().skip(1) 
    .subscribe(this::save, 
     System.err::println, 
     () -> System.out.println("completed")); 
+0

Ja, das letzte Beispiel ist das, was ich anstrebte. Überschreiben gleich macht den Job! Vielen Dank. – Luke