Ich bin neu in RxJava, aber niemand hat Ihre Frage beantwortet, also hier ist ein Stich bei ihm.
Mein Vorschlag ist, dass Ihre Producer-Klasse die Schnittstelle Observable.OnSubscribeFunc
(Javadoc) implementiert. Bei dem Verfahren public Subscription onSubscribe(Observer<T> observer)
können Sie Ihre Producer#readData()
Methode, um nennen, was derzeit verfügbaren Daten ist und diese Daten in die onNext()
-Methode übergeben, wie folgt aus:
// Make sure to change T as appropriate
public class Producer implements OnSubscribeFunc<T> {
. . .
@Override
public Subscription onSubscribe(Observer<T> observer) {
while (this.hasData()) {
observer.onNext(this.readData());
observer.onCompleted();
}
}
(Dies setzt voraus, dass Ihr Producer#readData()
Methode gibt Daten in Blöcken statt .. alle auf einmal nach Bedarf Tweak)
Sie können dann auf Ihre Producer-Objekt Observable#create(OnSubscribeFunc<T>)
Methode, abonnieren und auch ordnen Sie Ihre beobachtbare Rückstellelemente auf einem Timer haben:
// In your observer class
Observable<T> myProducer = Observable.create(new Producer(...));
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....);
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler));
// Now use myProducerTimed
Ich kann diese letzten zwei Zeilen jetzt nicht testen. Tut mir leid, ich werde das prüfen und meine Antwort aktualisieren, wenn sich die Dinge ändern.
HAFTUNGSAUSSCHLUSS: Ich bin ein RxJava n00b, also könnte diese Lösung saugen oder könnte unnötig unordentlich sein. Repariere, was falsch scheint.
UPDATE: Ich hatte ein Problem (RxJava -- Terminating Infinite Streams) mit RxJava auch; sieht so aus, als würde meine Lösung sehr ähnlich aussehen wie Ihre (mit dem Scheduler, um die Observable-Rückgabeelemente in regelmäßigen Abständen zu erstellen).
Mein Vorschlag macht all das schwere Heben in 'onSubcribe()'. Das ist falsch (siehe die SO-Frage, die ich mit meiner Antwort verknüpft habe), aber ich kann meine Antwort im Moment nicht überarbeiten. Ich begrüße Änderungen, wenn jemand vor mir dazu kommt. – MonkeyWithDarts