Ich versuche eine reaktive Anwendung zu machen, die auf einen Netzwerk-Socket auf einem separaten Thread für Preise hört und ein bisschen ratlos wurde, wie man das Observable
genau konstruiert. Viele der Schnittstellen, die ich habe, sind durch die API eingeschränkt, die ich verwende, und können daher nicht geändert werden. Ich habe destilliert, was ich versuche, als ein Test unten, aber ich kann nicht sehen, wie man den Körper der getPriceReactive()
Methode so füllt, dass die Preise auf der Konsole vom Teilnehmer gedruckt werden (sehen Sie den Kommentar im Code).Binden eines API-Callback an ein RxJava Observable
public class PriceObservableTest {
// This interface is defined externally and used by the API
private interface ITickHandler {
void priceReceived(double price);
}
// Stores the price (currently just one double for illustration)
private class Tick {
double price = Double.NaN;
}
// Implementation of handler called by API when it receives a price
private class TickHandler implements ITickHandler {
private final Tick tick;
TickHandler() { this.tick = new Tick(); }
@Override public void priceReceived(double x) { tick.price = x; }
}
// This class emulates the API delivering prices from the socket
private class PriceSource {
private final Thread thread;
PriceSource(final ITickHandler handler) {
thread = new Thread(new Runnable() {
final Random r = new Random();
@Override public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
handler.priceReceived(r.nextDouble() * 100);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Price thread closed");
}
});
}
void subscribe() { thread.start(); }
void unsubscribe() { thread.interrupt(); }
}
@Test
public void simpleTest() throws Exception {
final ITickHandler handler = new TickHandler();
// Simulate some prices received periodically from a socket
PriceSource prices = new PriceSource(handler);
Observable<Tick> reactive = getPriceReactive(handler);
reactive.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
// Observe prices for 1 second. The subscriber should print them to console
prices.subscribe();
Thread.sleep(1000);
prices.unsubscribe();
}
// Returns an observable that reacts to price changes
private Observable<Tick> getPriceReactive(ITickHandler handler) {
return Observable.create(new Observable.OnSubscribe<Tick>() {
@Override public void call(Subscriber<? super Tick> subscriber) {
// How to call subscriber.onNext() whenever
// priceReceived() is called with a new price?
}
});
}
}
Irgendwie muss subscriber.onNext()
aufgerufen werden, wenn die API priceReceived()
nennt, aber ich kann nicht ganz sehen, wie dies zu erreichen. Natürlich könnte ich einen Verweis auf den Teilnehmer in der TickHandler
speichern, aber diese Art von besiegt den Zweck, eine Observable
, nicht wahr?
Interessante Verwendung des Themas, aber ich bin ein bisschen misstrauisch von ihnen, da die RX-Dokumente vorschlagen, dass sie nur in seltenen Fällen benötigt werden sollten. Tatsächlich stellt sich heraus, dass eine sehr ähnliche Frage schon gestellt wurde, die Lösung bezieht sich nicht auf Themen: http://stackoverflow.com/questions/20552598/creating-observable-from-normal-java-events/ – ScarletPumpernickel
@ScarletPumpernickel Ich würde nicht • Sagen Sie, dass die Frage oder eine der Antworten für Ihre Frage relevant ist (es sei denn, Sie möchten Ihren Code mit benutzerdefinierten Ereignissen verseuchen). Ich würde auch sagen, dass dies ein richtiger Anwendungsfall für ein "Subject" – supertopi
ist. Ich werde den Einsatz von Subjekten +1 geben - sie sind ausgezeichnet, wenn Sie ein Observable mit externen Ereignissen fahren wollen, möglicherweise nicht vollständig unter Ihrer Kontrolle. Observable.create() funktioniert auch, aber es wird ein bisschen mühsamer, und Sie müssen mehrere Abonnements behandeln (oder verhindern). –