2017-02-16 1 views
3

Ich versuche, Gegendruck mit Feder-Web-reaktiven zeigen, genau wie es hier mit akka - https://www.youtube.com/watch?v=oS9w3VenDW0 (Watch zwischen 28:20 und 29:20) gezeigt wird.Nicht in der Lage, Gegendruck mit Federbahn reaktiv zu zeigen

es auszuprobieren Ich habe unten Beispielprojekt von Github verwendet https://github.com/bclozel/spring-boot-web-reactive

Nach dem Setup des Projekts ich einen neuen Endpunkt in HomeController.java hinzugefügt, wie unten dargestellt:

@RequestMapping(value = "/longflux",produces = "application/stream+json") 
public Flux<Long> longFlux(){ 
    return Flux.interval(Duration.ofMillis(10)).log(); 
} 
Jetzt

, wenn ich Versuchen Sie, diesen Endpunkt zu curren und ihn dann mit (CTRL + z) auszusetzen, sollte der Backpressure eingeschaltet sein, sobald die tcp-Puffer gefüllt sind und der Server die Ereignisse nicht mehr ausgibt.

jedoch nach dem curl Befehl zur Aussetzung wirft irgendwann unter Ausnahme:

2017-02-16 08:49:48.480 ERROR 3500 --- [  timer-1] reactor.Flux.Interval.4     : onError(reactor.core.Exceptions$OverflowException: Could not emit value 2578 due to lack of requests) 
2017-02-16 08:49:48.481 ERROR 3500 --- [  timer-1] reactor.Flux.Interval.4     : 
reactor.core.Exceptions$OverflowException: Could not emit value 2578 due to lack of requests 
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:151) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE] 
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:98) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE] 
    at reactor.core.scheduler.SingleTimedScheduler$TimedPeriodicScheduledRunnable.run(SingleTimedScheduler.java:394) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121] 

Ich bin nicht in der Lage zu verstehen, warum die mit Ausnahme beendete Anfrage in irgendwann nach dem curl Befehl (Im Frühjahr-web-reaktiven suspendiert Im Akka-Beispiel (wie im youtube-Link gezeigt) hat der Server die Veröffentlichung von Ereignissen gestoppt, sobald der tcp-Puffer voll war.

Antwort

3

Flux.interval ist ein besonderer Fall, da es eine heiße Quelle ist und die Zeit nicht durch Reaktor gepuffert wird; Dies bedeutet, dass der Reactor ein Error-Signal ausgibt, wenn der Anforderungszyklus aufgrund des Gegendrucks langsam ist und die Intervallquelle schneller produziert.

Sie können dieses Beispiel mit einem Operator .onBackpressureDrop() aktualisieren, um das Intervall bei Gegendruck zu verringern. Dies sollte sich wie erwartet verhalten.

Es gibt viele Möglichkeiten, den Gegendruck zu veranschaulichen, einschließlich:

  • das Abonnement mit einem delay Operator Verzögerung
  • mehrere langsame Clients (Bandbreite und Latenz)
+0

Dank Brian für die Zeiger zu simulieren. Ich war in der Lage, den Gegendruck jetzt unter Verwendung des Quellcodes zu veranschaulichen. Ich habe dieses Mal nicht die heiße Quelle verwendet, stattdessen eine Liste erstellt und dann den Stream zum Erstellen der Quelle verwendet. @RequestMapping (value = "/ longflux", produziert = "application/Strom + json") public Flux longFlux() { Liste list = new Arraylist (); \t für (int i = 0; i <1000000; ++ i) { Liste.add ("Spring-Boot-Starter-Web-reaktiv"); } zurückgeben Flux.fromStream (list.stream()). Log(); } – simar

Verwandte Themen