2016-03-20 10 views
-1

von Zeit zu Zeit müssen die klassische gleichzeitige Producer-Consumer-Lösung für das Projekt, an dem ich beteiligt sind, zu implementieren, ist ziemlich das Problem in einige Sammlung, die von mehreren Threads und was bevölkert wird reduziert wird von mehreren Verbrauchern konsumiert. Kurz gesagt ist die Sammlung sagen zu 10k Einheiten beschränkt, sobald Puffergröße ist ein Arbeiter Aufgabe ist unterzog diese 10k Entitäten, gibt es eine Grenze dieser Arbeiter sagen, es ist auf 10 gesetzt, was im schlimmsten Fall bedeutet, dass ich kann haben bis zu 10 Arbeiter, die jeweils 10.000 Einheiten verbrauchen.gleichzeitig eine Sammlung mit Reaktor

Ich muss mit ein paar Sperren spielen hier und da einige Kontrollen rund Pufferüberläufe (Fall, wenn Hersteller zu viele Daten generieren, während alle Arbeiter sind beschäftigt, ihre Stücke zu verarbeiten) müssen also neue Ereignisse zu verwerfen, um OOM (nicht die beste Lösung, aber Stabilität ist p1;))

War in diesen Tagen um Reaktor und eine Möglichkeit, es zu verwenden, anstatt Low-Level-und tun alle oben beschriebenen Dinge, so die dumme Frage ist: "kann Reaktor dafür verwendet werden Anwendungsfall?" für jetzt vergessen über Überlauf/Verwerfen .. Wie kann ich die N-Verbraucher für einen Sender erreichen?

war auf der Suche vor allem rund um Broadcaster mit dem Puffer + ein roten Faden gepoolt Dispatcher:

void test() { 
    final Broadcaster<String> sink = Broadcaster.create(Environment.initialize()); 
    Dispatcher dispatcher = Environment.newDispatcher(2048, 20, DispatcherType.WORK_QUEUE); 

    sink 
    .buffer(100) 
    .consumeOn(dispatcher, this::log); 

    for (int i=0; i<100000; i++) { 
    sink.onNext("elementent " + i); 
    if (i%1000 == 0) { 
     System.out.println("addded elements " + i); 
    } 
    } 
} 
void log(List<String> values) { 
    System.out.print("simulating slow processing...."); 
    System.out.println("processing: " + Arrays.toString(values.toArray())); 
    try { 
    Thread.sleep(1000); 
    } catch (InterruptedException e) { 
    e.printStackTrace(); 
    } 
} 

meine Absicht, hier ist ein Sender das Protokoll ausführen müssen (..) in asynch Weise als Puffergröße erreicht wurde, es jedoch sieht so aus, als würde es immer log (...) im blocking mode ausführen. Ausführen von 100 einmal gemacht nächsten 100 und so weiter .. Wie kann ich es asynch machen?

dank vyvalyty

Antwort

0

Ein mögliches Muster flatMap mit publishOn verwenden:

Flux.range(1, 1_000_000) 
.buffer(100) 
.flatMap(b -> Flux.just(b).publishOn(SchedulerGroup.io()) 
    .doOnNext(this::log)) 
.consume(...); 
Verwandte Themen