2017-08-07 7 views
0

Kann jemand erklären, warum dieser Code nie endet (wirft immer TimeoutException), 3.0.7 und 3.1.0.M3 alle gleich? Ich dachte, die restlichen 4 Fäden werden ihre Arbeit Stück für Stück erledigen.Spring Reactor Scheduler Sperre

private static class Holder { 
    private final int group, idx; 

    Holder(int group, int idx) { 
     this.group = group; 
     this.idx = idx; 
    } 
} 

private static final Logger log = LoggerFactory.getLogger(RxTest2.class); 

@Test 
public void test1() throws Exception { 
    final Scheduler scheduler = Schedulers.parallel(); 
    final AtomicInteger c = new AtomicInteger(); 

    Flux.range(0, 8).flatMap(gi -> Flux.range(0, 30).map(i -> new Holder(gi, i))) 
     .groupBy(h -> h.group) 

     .parallel() 
     .runOn(scheduler) 

     .flatMap(chunk -> { 

      log.debug("chunk {}", chunk.key()); 

      chunk 
       .parallel() 
       .runOn(scheduler) 
       .flatMap(h -> { 
        log.debug("{} - {}", h.group, h.idx); 
        c.incrementAndGet(); 
        return Mono.just(true); 
       }) 
       .sequential() 
       .blockLast(); 

      return Mono.just(true); 
     }) 
     .sequential() 
     .timeout(Duration.ofSeconds(2)) 
     .doOnTerminate(() -> log.debug("count: {}", c.get())) 
     .blockLast(); 
} 

wenn ich spaltete es zur Trennung von Disponenten (Schedulers.newParallel ("P1", 8), Schedulers.newParallel ("P2")) - ist alles OK

Gibt es eine Sonderregel für das Zählen Anzahl der Threads für den Scheduler?

Antwort

0

Sie scheinen mehrere parallele Berechnungen zu haben, die auf Schedulers.parallel() ausgeführt werden, und sie sättigen den Scheduler vollständig, so dass keine Threads zum Verarbeiten von .flatMap(chunk -> übrig sind. Wir haben hier eine Thread-Hungersituation. Entfernen Sie einfach die innere chunk.parallel, es ist nicht notwendig, dies zu tun, da die Berechnung bereits parallel ist.

+0

danke für die antwort, und entschuldigung für meine lange stille! nehmen wir setzen: ... final Scheduler Scheduler = Schedulers.parallel (16); ... .parallel (8) .runOn (Scheduler) ... .parallel (4) .runOn (Scheduler) Situation immer noch die gleichen. Ich denke: zuerst _parallel_ blockiert 8 Threads, andere Aufgaben können die restlichen 8 Threads verwenden. Innere Aufgaben sind kurz. Und die Arbeit sollte gut abgeschlossen sein. Ich möchte verstehen, warum ich falsch liege. Was möglicherweise verbleibende Fäden blockiert – user1222796

Verwandte Themen