2017-06-10 10 views
0

Kann jemand vorschlagen, wie Sie diesen Fall mit RX Java implementieren? enter image description hereRX-Beispiel mit Neustart nach 'Erst nach Pause'

Es sieht aus wie sample, aber es gibt zwei Unterschiede:

  1. Die Probe Start mit erster Emission(), auch wenn vor abonniert.
  2. Wenn Intervall zwischen zwei emittions (und 5 in diesem Beispiel) Abtastrate (1 sec) übersteigt, wird die Probe auf dem letzteren emittion neu gestartet wird ( 5).
+0

Ich denke, dass Sie einen benutzerdefinierten Operator für bestimmte Anforderungen verwenden können. –

+0

können Sie auf 1) näher eingehen? Möchten Sie die erste Emission für einen zukünftigen Abonnenten zwischenspeichern? – yosriz

+0

Nein, kein Caching erforderlich. Wie ein PublishSubject, nicht Behavior. – Alexey

Antwort

1

Dies erfordert einen individuellen Operator (zuverlässig) oder mit bestehenden Komponieren (weniger zuverlässig):

public static void main(String[] args) throws Exception { 
    int[] delay = { 0, 100, 250, 300, 900, 975, 1050, 1200 }; 

    Flowable.range(0, delay.length) 
    .flatMap(v -> Flowable.timer(delay[v], TimeUnit.MILLISECONDS) 
     .map(u -> (v + 1) + " @ " + delay[v] + " ms")) 
    .compose(firstAndSample(200, TimeUnit.MILLISECONDS)) 
    .blockingSubscribe(System.out::println) 
    ; 
} 

public static <T> FlowableTransformer<T, T> firstAndSample(long delay, TimeUnit unit) { 
    return f -> { 
     Scheduler s = new SingleScheduler(); 
     return f.publish(g -> { 
      return g.take(1) 
      .concatWith(
       g 
       .sample(delay, unit, s, true) 
       .timeout(delay + 1, unit, s) 
      ) 
      .retry(e -> e instanceof TimeoutException) 
      ; 
     }); 
    }; 
} 

es durch Phasenschalt arbeitet innerhalb eines gemeinsamen Strömungs des vorgeschalteten (publish). In der 1. Phase nimmt es 1 Element von Upstream und wechselt dann in die 2. Phase (concatWith). Die zweite Phase tastet den Upstream ab (sample) und wenn die Abtastung nach der Periode nichts emittiert (timeout), schlägt der Substream fehl und wir wechseln erneut zu Phase 1, indem wir erneut versuchen, die TimeoutException auszulösen. Um sicherzustellen, dass die Stichproben- und Zeitüberschreitungsbewertung der Reihe nach durchgeführt wird, wird für beide ein spezifischer single-threaded Scheduler verwendet. (Die geringere Zuverlässigkeit ergibt sich daraus, dass die Phasenänderung nicht atomar ist, da ihre Komponenten auf mehrere Operatoren verteilt sind).

+0

Schöne Annäherung, danke. Ich benutze RX Java 1, also werde ich versuchen, dieses Beispiel anzunehmen und es zu versuchen. – Alexey

Verwandte Themen