Ich schreibe einen Streaming-Twitter-Client, der einfach den Stream auf einen Fernseher wirft. Ich beobachte den Strom mit RxJava.RxJava Observable, um Ausbrüche von Ereignissen zu glätten
Wenn der Stream in einem Burst kommt, möchte ich ihn puffern und verlangsamen, so dass jeder Tweet für mindestens 6 Sekunden angezeigt wird. In den ruhigen Zeiten wird dann jeder Puffer, der aufgebaut wurde, sich allmählich entladen, indem er den Kopf der Warteschlange zieht, einen Tweet alle 6 Sekunden. Wenn ein neuer Tweet eintrifft und auf eine leere Queue blickt (aber> 6 Sekunden nach der letzten Anzeige), möchte ich, dass er sofort angezeigt wird.
Ich stelle mir den Strom wie die Suche here beschrieben:
Raw: --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|
Und ich verstehe, dass die Frage gestellt, eine Lösung hat. Aber ich kann meine Antwort nicht umdrehen. Hier ist meine Lösung:
myObservable
.concatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long l) {
return Observable.concat(
Observable.just(l),
Observable.<Long>empty().delay(6, TimeUnit.SECONDS)
);
}
})
.subscribe(...);
Also, meine Frage ist: Ist das zu naiv von einem Ansatz? Wo passiert der Puffer/Gegendruck? Gibt es eine bessere Lösung?
Ja! Es sieht definitiv so aus, dass dies mein Problem lösen wird. Aus Neugier, gibt es einen Vorteil zu verzögern, bevor die Nachricht (wie Sie es getan haben) über Verzögerung nach der Nachricht (wie ich es getan habe)? Ich denke beide Lösungen werden funktionieren. Vielleicht gibt es einen subtilen Rx-Unterschied, den ich nicht verstehe. – dgmltn
Weniger Bediener, weniger Aufwand. – akarnokd