2016-04-10 7 views
1

Ich schreibe einen Streaming-Twitter-Client, der einfach den Stream auf einen Fernseher wirft. Ich beobachte den Strom mit RxJava.RxJava Observable, um Ausbrüche von Ereignissen zu glätten

Wenn der Stream in einem Burst kommt, möchte ich ihn puffern und verlangsamen, so dass jeder Tweet für mindestens 6 Sekunden angezeigt wird. In den ruhigen Zeiten wird dann jeder Puffer, der aufgebaut wurde, sich allmählich entladen, indem er den Kopf der Warteschlange zieht, einen Tweet alle 6 Sekunden. Wenn ein neuer Tweet eintrifft und auf eine leere Queue blickt (aber> 6 Sekunden nach der letzten Anzeige), möchte ich, dass er sofort angezeigt wird.

Ich stelle mir den Strom wie die Suche here beschrieben:

Raw:  --oooo--------------ooooo-----oo----------------ooo| 
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o| 

Und ich verstehe, dass die Frage gestellt, eine Lösung hat. Aber ich kann meine Antwort nicht umdrehen. Hier ist meine Lösung:

myObservable 
    .concatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(Long l) { 
      return Observable.concat(
       Observable.just(l), 
       Observable.<Long>empty().delay(6, TimeUnit.SECONDS) 
      ); 
     } 
    }) 
    .subscribe(...); 

Also, meine Frage ist: Ist das zu naiv von einem Ansatz? Wo passiert der Puffer/Gegendruck? Gibt es eine bessere Lösung?

Antwort

2

Sieht so aus, als ob Sie eine Nachricht verzögern möchten, wenn sie im Vergleich zur vorherigen Nachricht zu früh kommt. Sie haben die letzte Zielemissionszeit zu verfolgen und eine neue Emission nach dem Zeitplan:

public class SpanOutV2 { 
    public static void main(String[] args) { 
     Observable<Integer> source = Observable.just(0, 5, 13) 
       .concatMapEager(v -> Observable.just(v).delay(v, TimeUnit.SECONDS)); 

     long minSpan = 6; 
     TimeUnit unit = TimeUnit.SECONDS; 
     Scheduler scheduler = Schedulers.computation(); 

     long minSpanMillis = TimeUnit.MILLISECONDS.convert(minSpan, unit); 

     Observable.defer(() -> { 
      AtomicLong lastEmission = new AtomicLong(); 

      return source 
      .concatMapEager(v -> { 
       long now = scheduler.now(); 
       long emission = lastEmission.get(); 

       if (emission + minSpanMillis > now) { 
        lastEmission.set(emission + minSpanMillis); 
        return Observable.just(v).delay(emission + minSpanMillis - now, TimeUnit.MILLISECONDS); 
       } 
       lastEmission.set(now); 
       return Observable.just(v); 
      }); 
     }) 
     .timeInterval() 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 
} 

Hier wird die Quelle durch die Anzahl der Sekunden relativ zum Start des Problems verzögert. 0 sollte sofort ankommen, 5 sollte ankommen @ T = 6 Sekunden und 13 sollte ankommen @ T = 13. concatMapEager stellt sicher, dass die Reihenfolge und das Timing eingehalten wird. Da nur Standardoperatoren verwendet werden, ist der Gegendruck und die Abmeldung selbstverständlich.

+0

Ja! Es sieht definitiv so aus, dass dies mein Problem lösen wird. Aus Neugier, gibt es einen Vorteil zu verzögern, bevor die Nachricht (wie Sie es getan haben) über Verzögerung nach der Nachricht (wie ich es getan habe)? Ich denke beide Lösungen werden funktionieren. Vielleicht gibt es einen subtilen Rx-Unterschied, den ich nicht verstehe. – dgmltn

+0

Weniger Bediener, weniger Aufwand. – akarnokd

Verwandte Themen