Dies erfordert einen individuellen Operator (zuverlässig) oder mit bestehenden Komponieren (weniger zuverlässig):
public static void main(String[] args) throws Exception {
int[] delay = { 0, 100, 250, 300, 900, 975, 1050, 1200 };
Flowable.range(0, delay.length)
.flatMap(v -> Flowable.timer(delay[v], TimeUnit.MILLISECONDS)
.map(u -> (v + 1) + " @ " + delay[v] + " ms"))
.compose(firstAndSample(200, TimeUnit.MILLISECONDS))
.blockingSubscribe(System.out::println)
;
}
public static <T> FlowableTransformer<T, T> firstAndSample(long delay, TimeUnit unit) {
return f -> {
Scheduler s = new SingleScheduler();
return f.publish(g -> {
return g.take(1)
.concatWith(
g
.sample(delay, unit, s, true)
.timeout(delay + 1, unit, s)
)
.retry(e -> e instanceof TimeoutException)
;
});
};
}
es durch Phasenschalt arbeitet innerhalb eines gemeinsamen Strömungs des vorgeschalteten (publish
). In der 1. Phase nimmt es 1 Element von Upstream und wechselt dann in die 2. Phase (concatWith
). Die zweite Phase tastet den Upstream ab (sample
) und wenn die Abtastung nach der Periode nichts emittiert (timeout
), schlägt der Substream fehl und wir wechseln erneut zu Phase 1, indem wir erneut versuchen, die TimeoutException
auszulösen. Um sicherzustellen, dass die Stichproben- und Zeitüberschreitungsbewertung der Reihe nach durchgeführt wird, wird für beide ein spezifischer single-threaded Scheduler
verwendet. (Die geringere Zuverlässigkeit ergibt sich daraus, dass die Phasenänderung nicht atomar ist, da ihre Komponenten auf mehrere Operatoren verteilt sind).
Ich denke, dass Sie einen benutzerdefinierten Operator für bestimmte Anforderungen verwenden können. –
können Sie auf 1) näher eingehen? Möchten Sie die erste Emission für einen zukünftigen Abonnenten zwischenspeichern? – yosriz
Nein, kein Caching erforderlich. Wie ein PublishSubject, nicht Behavior. – Alexey