2017-01-26 4 views

Antwort

0

Ich dachte buffer(Duration) Ihren Bedarf passen würde, aber es funktioniert nicht.

bearbeiten: Lassen Sie dies für den Fall, dass jemand mit Ihrem genau gleichen Bedarf versucht ist, diesen Operator zu verwenden. Diese Variante des Puffers teilt die Sequenz in aufeinanderfolgende Zeitfenster auf (die jeweils eine buffer erzeugen). Das heißt, die neue delay beginnt am Ende der vorherigen, nicht wenn ein neues Element ohne Verzögerung ausgegeben wird.

+0

Nein, mit Puffer() die Verzögerungen werden jedesmal beginnen eine weitere Verzögerung endet. Ich brauche das bei jedem eingehenden Wert, wenn es keine aktuelle Verzögerung gibt, dann gebe den Wert zurück und starte eine Verzögerung oder puffere es, wenn bereits eine Verzögerung läuft. –

+0

linke Antwort an Ort und Stelle, um zu klären, warum es nicht angepasst ist –

1

Dies kann mit einem nicht trivialen Satz von zusammengesetzten Operatoren erreicht werden.

import java.time.Duration; 
import java.util.*; 

import reactor.core.publisher.*; 

public class DelayedBuffer { 

    public static void main(String[] args) { 
     Flux.just(1, 2, 3, 6, 7, 10) 
     .flatMap(v -> Mono.delayMillis(v * 1000) 
       .doOnNext(w -> System.out.println("T=" + v)) 
       .map(w -> v) 
     ) 
     .compose(f -> delayedBufferAfterFirst(f, Duration.ofSeconds(2))) 
     .doOnNext(System.out::println) 
     .blockLast(); 
    } 

    public static <T> Flux<List<T>> delayedBufferAfterFirst(Flux<T> source, Duration d) { 
     return source 
     .publish(f -> { 
      return f.take(1).collectList() 
      .concatWith(f.buffer(d).take(1)) 
      .repeatWhen(r -> r.takeUntilOther(f.ignoreElements())); 
     }); 
    } 
} 

(Beachten Sie jedoch, dass die zu erwartenden Emissionsmuster besser mit einem benutzerdefinierten Bediener angepasst werden können aufgrund von Zeit beteiligt zu sein.)

+0

Danke, es hat mir geholfen, aber ich denke, das passt mehr mein Bedürfnis: .publish (f -> f.take (1) .collectList() \t .concatWith (f.take (d) .collectList() \t \t .filter (Liste -> list.isEmpty()) \t \t .repeatWhen (r -> r.takeWhile (n -> n> 0))) \t .repeatWhen (r -> r.takeUntilOther (f.ignoreElements()))) –