2013-12-13 10 views
5

Ich habe eine Klasse Producer, vereinfacht es hat Methode public Object readData() Ich möchte machen diese Klasse als Observable (RxJava) zu verwenden.Wie benutzerdefinierte Klasse als beobachtbare und Feuer Methode im Zeitintervall

Wie wird angegeben, welche Methode aufgerufen werden soll? Muss ich meine Producer Klasse in Future oder Iterable umwandeln?

Das nächste Problem ist, dass die readData Anruf alle n Sekunden sein sollte. Einige Methoden, zum Beispiel from, hat Scheduler-Parameter, aber ich kann kein Beispiel finden, wie man es anwendet. Ich fand interval Methode, aber es emittiert eine Sequenz von ganzen Zahlen. Bis jetzt, ohne Observable Ich benutze Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....)

Antwort

3

Sie können immer noch die interval() Methode verwenden, ignorieren Sie einfach das Ergebnis!

final Producer producer = ...; 
int n = ...; 
Observable<Object> obs = 
    Observable.interval(n,TimeUnit.SECONDS) 
       .map(new Func1<Integer,Object>() { 
        @Override 
        public Object call(Integer i) { 
         return producer.readData(); 
        } 
       )); 
1

Ich bin neu in RxJava, aber niemand hat Ihre Frage beantwortet, also hier ist ein Stich bei ihm.

Mein Vorschlag ist, dass Ihre Producer-Klasse die Schnittstelle Observable.OnSubscribeFunc (Javadoc) implementiert. Bei dem Verfahren public Subscription onSubscribe(Observer<T> observer) können Sie Ihre Producer#readData() Methode, um nennen, was derzeit verfügbaren Daten ist und diese Daten in die onNext()-Methode übergeben, wie folgt aus:

// Make sure to change T as appropriate 
public class Producer implements OnSubscribeFunc<T> { 
    . . . 

    @Override 
    public Subscription onSubscribe(Observer<T> observer) { 
     while (this.hasData()) { 
      observer.onNext(this.readData()); 
     observer.onCompleted(); 
    } 
} 

(Dies setzt voraus, dass Ihr Producer#readData() Methode gibt Daten in Blöcken statt .. alle auf einmal nach Bedarf Tweak)

Sie können dann auf Ihre Producer-Objekt Observable#create(OnSubscribeFunc<T>) Methode, abonnieren und auch ordnen Sie Ihre beobachtbare Rückstellelemente auf einem Timer haben:

// In your observer class 
Observable<T> myProducer = Observable.create(new Producer(...)); 

ScheduledExecutorService scheduler = 
     Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....); 
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler)); 
// Now use myProducerTimed 

Ich kann diese letzten zwei Zeilen jetzt nicht testen. Tut mir leid, ich werde das prüfen und meine Antwort aktualisieren, wenn sich die Dinge ändern.

HAFTUNGSAUSSCHLUSS: Ich bin ein RxJava n00b, also könnte diese Lösung saugen oder könnte unnötig unordentlich sein. Repariere, was falsch scheint.

UPDATE: Ich hatte ein Problem (RxJava -- Terminating Infinite Streams) mit RxJava auch; sieht so aus, als würde meine Lösung sehr ähnlich aussehen wie Ihre (mit dem Scheduler, um die Observable-Rückgabeelemente in regelmäßigen Abständen zu erstellen).

+0

Mein Vorschlag macht all das schwere Heben in 'onSubcribe()'. Das ist falsch (siehe die SO-Frage, die ich mit meiner Antwort verknüpft habe), aber ich kann meine Antwort im Moment nicht überarbeiten. Ich begrüße Änderungen, wenn jemand vor mir dazu kommt. – MonkeyWithDarts

Verwandte Themen