2013-05-16 4 views
5

Ich habe N Worker, die eine Warteschlange von Elementen teilen, die berechnet werden sollen. Bei jeder Iteration entfernt jeder Worker ein Element aus der Warteschlange und kann mehr Elemente für die Berechnung erzeugen, die in die gleiche Warteschlange gestellt werden. Grundsätzlich ist jeder Produzent auch ein Verbraucher. Die Berechnung endet, wenn sich keine Elemente in der Warteschlange befinden und alle Worker das aktuelle Element berechnet haben (daher können keine weiteren zu berechnenden Elemente erzeugt werden). Ich möchte einen Disponenten/Koordinator vermeiden, also sollten die Arbeiter koordinieren. Was ist das beste Muster, um es einem Arbeiter zu ermöglichen, herauszufinden, ob die Haltebedingungen gültig sind, und damit die Berechnung im Auftrag der anderen zu stoppen?Java Producer-Consumer mit Stoppbedingung

Zum Beispiel, wenn alle Fäden nur diesen Zyklus tun würde, wenn die Elemente alle berechnet werden, würde es führen alle Fäden ewig blockiert:

while (true) { 
    element = queue.poll(); 
    newElements[] = compute(element); 
    if (newElements.length > 0) { 
     queue.addAll(newElements); 
    } 
} 

Antwort

6

eine Anzahl von aktiven Threads beibehalten.

public class ThreadCounter { 
    public static final AtomicInteger threadCounter = new AtomicInteger(N); 
    public static final AtomicInteger queueCounter = new AtomicInteger(0); 
    public static final Object poisonPill = new Object(); 
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead 
} 

Ihre Threads' Abfrageschleife wie folgt aussehen sollte (Ich gehe davon aus, dass Sie mit einem BlockingQueue)

while(!ThreadCounter.cancel) { 
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking 
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) { 
     ThreadCounter.cancel = true; 
     queue.offer(ThreadCounter.poisonPill); 
    } else { 
     Object obj = queue.take(); 
     ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking 
     ThreadCounter.queueCounter.decrementAndGet(); 
     if(obj == ThreadCounter.poisonPill) { 
      queue.offer(obj); // send the poison pill back through the queue so the other threads can read it 
      continue; 
     } 
    } 
} 

Wenn ein Thread auf dem BlockingQueue zu blockieren ist etwa dann dekrementiert sie der Zähler; Wenn alle Threads bereits auf die Warteschlange warten (dh counter == 0), setzt der letzte Thread cancel auf true und sendet dann eine Giftpille durch die Warteschlange, um die anderen Threads zu aktivieren. Jeder Thread sieht die Giftpille, sendet sie zurück durch die Warteschlange, um die verbleibenden Threads zu aktivieren, und beendet dann die Schleife, wenn er feststellt, dass cancel auf "True" gesetzt ist.

Edit: ich die Daten Rennen entfernt haben, indem ein queueCounter hinzufügen, die eine Zählung der Anzahl der Objekte in der Warteschlange unterhält (natürlich werden Sie auch einen queueCounter.incrementAndGet() Aufruf wohin hinzufügen müssen Sie Objekte hinzufügen in die Warteschlange). Dies funktioniert wie folgt: Wenn threadCount == 0, aber queueCount != 0, dann bedeutet dies, dass ein Thread gerade ein Element aus der Warteschlange entfernt hat, aber threadCount.getAndIncrement noch nicht aufgerufen hat, und daher ist die Abbruchvariable nicht auf True festgelegt. Es ist wichtig, dass der Anruf threadCount.getAndIncrement dem Anruf queueCount.getAndDecrement vorausgeht, andernfalls haben Sie immer noch ein Datenrennen. Es sollte egal sein, in welcher Reihenfolge Sie queueCount.getAndIncrement aufrufen, da Sie dies nicht mit einem Aufruf an threadCount.getAndDecrement verschachteln werden (letzterer wird am Ende der Schleife aufgerufen, ersterer wird am Anfang der Schleife aufgerufen).

Beachten Sie, dass Sie nicht einfach queueCount verwenden können, um zu bestimmen, wann der Prozess beendet werden soll, da ein Thread möglicherweise noch aktiv ist, ohne dass Daten in die Warteschlange gestellt wurden. Mit anderen Worten: queueCount wäre null ist nicht Null, sobald der Thread seine aktuelle Iteration beendet hat.

Anstatt die poisonPill wiederholt durch die Warteschlange zu senden, können Sie stattdessen den Cancelling-Thread (N-1) poisonPills über die Warteschlange senden lassen. Seien Sie vorsichtig, wenn Sie diesen Ansatz mit einer anderen Warteschlange verwenden, da einige Warteschlangen (z. B. Amazon Simple Queue Service) möglicherweise mehrere Elemente entsprechend ihrer take-Methode zurückgeben. In diesem Fall müssen Sie wiederholt über die poisonPill senden dass alles still steht.Zusätzlich

, stattdessen eine while(!cancel) Schleife zu verwenden, können Sie eine while(true) Schleife verwenden und brechen, wenn die Schleife eine poisonPill

+0

erkennt i flüchtig sein erraten abbrechen sollte? – marcorossi

+0

Was passiert, wenn ein Thread zwischen dem Aufruf von queue.take() und incrementAndGet() liegt und der andere Thread die Bedingungen count == 0 und queue.isEmpty() auswertet? – marcorossi

+0

@marcorossi yeah 'cancel' sollte flüchtig sein, so dass die Änderung in allen Threads sichtbar ist. Zum Beispiel sehen alle Threads das letzte Schreiben in das 'cancel'-Feld, es garantiert, dass Lesen und Schreiben atomar sind. Habe die Antwort bearbeitet. – adamjmarkham