Wie kann ich vorhandene Flux-Operatoren verwenden, damit ein Flux eingehende Werte in mehrere Listen mit einer minimalen Verzögerung zwischen den Retouren zurückgibt?Reaktor 3 'Intervallpuffer' Flux?
Antwort
Ich dachte buffer(Duration)
Ihren Bedarf passen würde, aber es funktioniert nicht.
bearbeiten: Lassen Sie dies für den Fall, dass jemand mit Ihrem genau gleichen Bedarf versucht ist, diesen Operator zu verwenden. Diese Variante des Puffers teilt die Sequenz in aufeinanderfolgende Zeitfenster auf (die jeweils eine buffer
erzeugen). Das heißt, die neue delay
beginnt am Ende der vorherigen, nicht wenn ein neues Element ohne Verzögerung ausgegeben wird.
Dies kann mit einem nicht trivialen Satz von zusammengesetzten Operatoren erreicht werden.
import java.time.Duration;
import java.util.*;
import reactor.core.publisher.*;
public class DelayedBuffer {
public static void main(String[] args) {
Flux.just(1, 2, 3, 6, 7, 10)
.flatMap(v -> Mono.delayMillis(v * 1000)
.doOnNext(w -> System.out.println("T=" + v))
.map(w -> v)
)
.compose(f -> delayedBufferAfterFirst(f, Duration.ofSeconds(2)))
.doOnNext(System.out::println)
.blockLast();
}
public static <T> Flux<List<T>> delayedBufferAfterFirst(Flux<T> source, Duration d) {
return source
.publish(f -> {
return f.take(1).collectList()
.concatWith(f.buffer(d).take(1))
.repeatWhen(r -> r.takeUntilOther(f.ignoreElements()));
});
}
}
(Beachten Sie jedoch, dass die zu erwartenden Emissionsmuster besser mit einem benutzerdefinierten Bediener angepasst werden können aufgrund von Zeit beteiligt zu sein.)
Danke, es hat mir geholfen, aber ich denke, das passt mehr mein Bedürfnis: .publish (f -> f.take (1) .collectList() \t .concatWith (f.take (d) .collectList() \t \t .filter (Liste -> list.isEmpty()) \t \t .repeatWhen (r -> r.takeWhile (n -> n> 0))) \t .repeatWhen (r -> r.takeUntilOther (f.ignoreElements()))) –
- 1. Reaktor 3.x (Java): für das Abstreifen des Netzes
- 2. Akka oder Reaktor
- 3. Abhängigkeiten im gleichen Reaktor
- 4. Reaktor vs Proactor
- 5. gleichzeitig eine Sammlung mit Reaktor
- 6. Serving statischen Inhalt mit Reaktor
- 7. Projekt-Reaktor - So erstellen Sie ein gleitendes Fenster Fluss aus einem Java 8-Stream
- 8. Dezimal-Genauigkeit Code-Flux
- 9. Flux Utils MapStore
- 10. Flux Architektur Kreisabhängigkeit
- 11. Flux und Anwendungszustand
- 12. flux multiple store instances
- 13. Welchen Reaktor soll ich für Qt4 verwenden?
- 14. Verpackung für Reaktor-Multi-Modul bauen?
- 15. Gibt es eine C# -Reaktor/Proactor-Bibliothek?
- 16. Was ist der "Reaktor" in Maven?
- 17. Reactive Stream Gegendruck mit Feder Reaktor Projekt
- 18. Spring Cloud Stream Reaktor Kafka Integration
- 19. Fluidtypo3 Flux - im Tabellenfeld speichern
- 20. React-Flux: Fehler mit AppDispatcher.register
- 21. Apache Storm Flux ändern Topologie
- 22. Bound Callable Reference funktioniert nicht mit Reaktor Abonnieren
- 23. Typo3 7.6 Flux Backend Layout Spaltentitel
- 24. Redux/Flux (mit ReactJS) und Animation
- 25. Flux auf Mono, execute-Methode auf Mono und Transformation zurück zu Flux
- 26. Wie bekomme ich den Reaktor von ApplicationRunner in autobahnPython?
- 27. Twisted Reaktor ist gestoppt, aber Programm endet nicht?
- 28. Failure finden io.projectreactor: Reaktor-bom: pom: Bismuth-M1
- 29. Maven bauen zweite Ebene + Kinderprojekte Reaktor Option -pl
- 30. Run JUnit Tests für Maven Reaktor verschachtelte Unterteilmodul
Nein, mit Puffer() die Verzögerungen werden jedesmal beginnen eine weitere Verzögerung endet. Ich brauche das bei jedem eingehenden Wert, wenn es keine aktuelle Verzögerung gibt, dann gebe den Wert zurück und starte eine Verzögerung oder puffere es, wenn bereits eine Verzögerung läuft. –
linke Antwort an Ort und Stelle, um zu klären, warum es nicht angepasst ist –