2016-03-30 13 views
1

Ich versuche eine reaktive Anwendung zu machen, die auf einen Netzwerk-Socket auf einem separaten Thread für Preise hört und ein bisschen ratlos wurde, wie man das Observable genau konstruiert. Viele der Schnittstellen, die ich habe, sind durch die API eingeschränkt, die ich verwende, und können daher nicht geändert werden. Ich habe destilliert, was ich versuche, als ein Test unten, aber ich kann nicht sehen, wie man den Körper der getPriceReactive() Methode so füllt, dass die Preise auf der Konsole vom Teilnehmer gedruckt werden (sehen Sie den Kommentar im Code).Binden eines API-Callback an ein RxJava Observable

public class PriceObservableTest { 

    // This interface is defined externally and used by the API 
    private interface ITickHandler { 
     void priceReceived(double price); 
    } 

    // Stores the price (currently just one double for illustration) 
    private class Tick { 
     double price = Double.NaN; 
    } 

    // Implementation of handler called by API when it receives a price 
    private class TickHandler implements ITickHandler { 
     private final Tick tick; 

     TickHandler() { this.tick = new Tick(); } 

     @Override public void priceReceived(double x) { tick.price = x; } 
    } 

    // This class emulates the API delivering prices from the socket 
    private class PriceSource { 
     private final Thread thread; 

     PriceSource(final ITickHandler handler) { 
      thread = new Thread(new Runnable() { 
       final Random r = new Random(); 
       @Override public void run() { 
        while (!Thread.currentThread().isInterrupted()) { 
         try { 
          Thread.sleep(100); 
          handler.priceReceived(r.nextDouble() * 100); 
         } catch (InterruptedException e) { 
          break; 
         } 
        } 
        System.out.println("Price thread closed"); 
       } 
     }); 
     } 

     void subscribe() { thread.start(); } 

     void unsubscribe() { thread.interrupt(); } 
    } 

    @Test 
    public void simpleTest() throws Exception { 

     final ITickHandler handler = new TickHandler(); 

     // Simulate some prices received periodically from a socket 
     PriceSource prices = new PriceSource(handler); 

     Observable<Tick> reactive = getPriceReactive(handler); 

     reactive.subscribe(new Subscriber<Tick>() { 
      @Override public void onCompleted() { } 
      @Override public void onError(Throwable e) { } 
      @Override public void onNext(Tick tick) { 
       System.out.println("Received price: " + tick.price); 
      }}); 

     // Observe prices for 1 second. The subscriber should print them to console 
     prices.subscribe(); 
     Thread.sleep(1000); 
     prices.unsubscribe(); 
    } 

    // Returns an observable that reacts to price changes 
    private Observable<Tick> getPriceReactive(ITickHandler handler) { 
     return Observable.create(new Observable.OnSubscribe<Tick>() { 
      @Override public void call(Subscriber<? super Tick> subscriber) { 

       // How to call subscriber.onNext() whenever 
       // priceReceived() is called with a new price? 

      } 
     }); 
    } 
} 

Irgendwie muss subscriber.onNext() aufgerufen werden, wenn die API priceReceived() nennt, aber ich kann nicht ganz sehen, wie dies zu erreichen. Natürlich könnte ich einen Verweis auf den Teilnehmer in der TickHandler speichern, aber diese Art von besiegt den Zweck, eine Observable, nicht wahr?

Antwort

1

Übergang zu Observable in ITickHandler Implementierung. Sie steuern nicht den Teilnehmer (n), aber der Verlag

private class TickHandler implements ITickHandler { 
    private final Tick tick; 
    private final PublishSubject<Tick> priceSubject; 

    TickHandler() { 
     this.tick = new Tick(); 
     this.priceSubject = PublishSubject.create(); 
    } 

    @Override public void priceReceived(double x) 
    { 
     tick.price = x; 
     priceSubject.onNext(tick); 
    } 

    public Observable<Tick> priceReceivedObservable() 
    { 
     return priceSubject.asObservable(); 
    } 
} 

Und Sie es in Ihren Tests wie verwenden können:

final ITickHandler handler = new TickHandler(); 
PriceSource prices = new PriceSource(handler); 

handler.priceReceivedObservable() 
     .subscribe(new Subscriber<Tick>() { 
      @Override public void onCompleted() { } 
      @Override public void onError(Throwable e) { } 
      @Override public void onNext(Tick tick) { 
       System.out.println("Received price: " + tick.price); 
      }}); 

Ich warne Sie, es ist nicht getestet, da ich ein nicht tun Viel Java :)

+0

Interessante Verwendung des Themas, aber ich bin ein bisschen misstrauisch von ihnen, da die RX-Dokumente vorschlagen, dass sie nur in seltenen Fällen benötigt werden sollten. Tatsächlich stellt sich heraus, dass eine sehr ähnliche Frage schon gestellt wurde, die Lösung bezieht sich nicht auf Themen: http://stackoverflow.com/questions/20552598/creating-observable-from-normal-java-events/ – ScarletPumpernickel

+0

@ScarletPumpernickel Ich würde nicht • Sagen Sie, dass die Frage oder eine der Antworten für Ihre Frage relevant ist (es sei denn, Sie möchten Ihren Code mit benutzerdefinierten Ereignissen verseuchen). Ich würde auch sagen, dass dies ein richtiger Anwendungsfall für ein "Subject" – supertopi

+1

ist. Ich werde den Einsatz von Subjekten +1 geben - sie sind ausgezeichnet, wenn Sie ein Observable mit externen Ereignissen fahren wollen, möglicherweise nicht vollständig unter Ihrer Kontrolle. Observable.create() funktioniert auch, aber es wird ein bisschen mühsamer, und Sie müssen mehrere Abonnements behandeln (oder verhindern). –

Verwandte Themen