2016-06-29 7 views
0

Ich bin nicht ganz neu zu RxJava, aber ich bin mit dem, was blockiert scheint eine einfache Aufgabe zu sein.RxJava - auslösen andere beobachtbare/komplettierbar vor dem vollständigen und vollständig erst nach

Ich habe eine Datenquelle, die eine reaktive API freigibt, alles, was ich tun möchte, ist, einige Daten zu holen, es zurückzugeben und die Verbindung automatisch zu schließen, wenn es nichts anderes gibt, um zu emittieren.

Hier ist mein Code:

public Observable<Object> execute(String query) { 

    Single<RxConnection> rxConnection = getRxDB().getConnection(); 

    return rxConnection.flatMapObservable(conn -> { 
     Observable<Object> rxResult = conn.query(query); 

     return rxResult.doOnCompleted(() -> { 
      conn.close(); // THIS DOES NOT WORK. I would like to close the connection and to wait without blocking. 
     }); 

    }); 

} 

conn.query() und conn.close() in verschiedenen Schedulers asynchron ausgeführt. Dieser Code funktioniert nicht, weil conn.close() eine komplettierbar gibt, die keine Abonnenten. Darüber hinaus, wenn ich in der doOnCompleted Methode selbst manuell abonnieren, vervollständigt das rxResult beobachtbare ohne Warten auf die Verbindung geschlossen werden.

Ich möchte, dass die „execute (String-Abfrage)“ Methode eine beobachtbare kehrt das: - Emit alle vom conn.query geholt Artikel() aufrufen - wenn es keine weiteren Einträge zu emittieren sind, löst conn.close() - ergänzt nur NACH der conn.close() komplettierbar;

Danke.

Antwort

3

Wie wäre:

private Observable<Object> executeDeferred(String query) { 
    // your original execute() method here, including doOnComplete 
} 


public Observable<Object> execute(String query) { 
    return Observable.defer(() -> executeDeferred(query)); 
} 

Dadurch wird sichergestellt, dass die Verbindung, Abfrage und Release geschehen nur, wenn:

Observable<Object> rxResult = conn.query(query) 
.concatWith(conn.close().toObservable()) 
.onErrorResumeNext(e -> 
    conn.close().toObservable().concatWith(Observable.error(e))); 
+0

Dies scheint mein Problem auf ziemlich einfache Weise zu lösen. Außerdem behandelt es mögliche Fehler. Es ist jedoch schrecklich, dass die Rx zwingt, alles zu beobachtbaren zu verwandeln, viele nutzlose Objekte zu schaffen. –

0

Ressourcen Verschluss ist mit Observable.using verwaltet:

Observable<T> obs = 
    Observable.using(
     resourceFactory, 
     observableFactory, 
     disposeAction) 

diesem Herstellungsverfahren stellt sicher, dass eine durch die resourceFactory erstellte Ressource bei Beendigung (Fertigstellung oder Fehler) oder abzumelden angeordnet ist.

+0

Dies ist eine interessante Lösung; Aber ich kann es nicht schaffen. In der Tat erlaubt die resourceFactory nicht, eine Observable zurückzugeben, sondern nur eine Ressourceninstanz, die nicht mein Fall ist. –

+0

Die ResourceFactory liefert eine Ressource, die ObservableFactory erstellt eine Observable aus einer Ressource (erstellt von der RessourceFactory). @akarnokd deckt die Details Ihres spezifischen Problems ab, aber Sie sollten "Observable.using" verwenden, um eine ordnungsgemäße Ressourcenschließung zu erreichen. –

0

Also, Sie wollen nur die Abfrage auszuführen, wenn die beobachtbaren abonniert wird Das Observable erhält ein Abonnement und nicht während des Setups.

Edit: Sie wollen auch ein doOnUnsubscribe/doOnTerminate, um die Verbindung zu schließen, wenn ein vorzeitiges Abmelde passiert.

Verwandte Themen