Kann jemand erklären, warum dieser Code nie endet (wirft immer TimeoutException), 3.0.7 und 3.1.0.M3 alle gleich? Ich dachte, die restlichen 4 Fäden werden ihre Arbeit Stück für Stück erledigen.Spring Reactor Scheduler Sperre
private static class Holder {
private final int group, idx;
Holder(int group, int idx) {
this.group = group;
this.idx = idx;
}
}
private static final Logger log = LoggerFactory.getLogger(RxTest2.class);
@Test
public void test1() throws Exception {
final Scheduler scheduler = Schedulers.parallel();
final AtomicInteger c = new AtomicInteger();
Flux.range(0, 8).flatMap(gi -> Flux.range(0, 30).map(i -> new Holder(gi, i)))
.groupBy(h -> h.group)
.parallel()
.runOn(scheduler)
.flatMap(chunk -> {
log.debug("chunk {}", chunk.key());
chunk
.parallel()
.runOn(scheduler)
.flatMap(h -> {
log.debug("{} - {}", h.group, h.idx);
c.incrementAndGet();
return Mono.just(true);
})
.sequential()
.blockLast();
return Mono.just(true);
})
.sequential()
.timeout(Duration.ofSeconds(2))
.doOnTerminate(() -> log.debug("count: {}", c.get()))
.blockLast();
}
wenn ich spaltete es zur Trennung von Disponenten (Schedulers.newParallel ("P1", 8), Schedulers.newParallel ("P2")) - ist alles OK
Gibt es eine Sonderregel für das Zählen Anzahl der Threads für den Scheduler?
danke für die antwort, und entschuldigung für meine lange stille! nehmen wir setzen: ... final Scheduler Scheduler = Schedulers.parallel (16); ... .parallel (8) .runOn (Scheduler) ... .parallel (4) .runOn (Scheduler) Situation immer noch die gleichen. Ich denke: zuerst _parallel_ blockiert 8 Threads, andere Aufgaben können die restlichen 8 Threads verwenden. Innere Aufgaben sind kurz. Und die Arbeit sollte gut abgeschlossen sein. Ich möchte verstehen, warum ich falsch liege. Was möglicherweise verbleibende Fäden blockiert – user1222796