Ich bin nicht ganz neu zu RxJava, aber ich bin mit dem, was blockiert scheint eine einfache Aufgabe zu sein.RxJava - auslösen andere beobachtbare/komplettierbar vor dem vollständigen und vollständig erst nach
Ich habe eine Datenquelle, die eine reaktive API freigibt, alles, was ich tun möchte, ist, einige Daten zu holen, es zurückzugeben und die Verbindung automatisch zu schließen, wenn es nichts anderes gibt, um zu emittieren.
Hier ist mein Code:
public Observable<Object> execute(String query) {
Single<RxConnection> rxConnection = getRxDB().getConnection();
return rxConnection.flatMapObservable(conn -> {
Observable<Object> rxResult = conn.query(query);
return rxResult.doOnCompleted(() -> {
conn.close(); // THIS DOES NOT WORK. I would like to close the connection and to wait without blocking.
});
});
}
conn.query() und conn.close() in verschiedenen Schedulers asynchron ausgeführt. Dieser Code funktioniert nicht, weil conn.close() eine komplettierbar gibt, die keine Abonnenten. Darüber hinaus, wenn ich in der doOnCompleted Methode selbst manuell abonnieren, vervollständigt das rxResult beobachtbare ohne Warten auf die Verbindung geschlossen werden.
Ich möchte, dass die „execute (String-Abfrage)“ Methode eine beobachtbare kehrt das: - Emit alle vom conn.query geholt Artikel() aufrufen - wenn es keine weiteren Einträge zu emittieren sind, löst conn.close() - ergänzt nur NACH der conn.close() komplettierbar;
Danke.
Dies scheint mein Problem auf ziemlich einfache Weise zu lösen. Außerdem behandelt es mögliche Fehler. Es ist jedoch schrecklich, dass die Rx zwingt, alles zu beobachtbaren zu verwandeln, viele nutzlose Objekte zu schaffen. –