eine Anzahl von aktiven Threads beibehalten.
public class ThreadCounter {
public static final AtomicInteger threadCounter = new AtomicInteger(N);
public static final AtomicInteger queueCounter = new AtomicInteger(0);
public static final Object poisonPill = new Object();
public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}
Ihre Threads' Abfrageschleife wie folgt aussehen sollte (Ich gehe davon aus, dass Sie mit einem BlockingQueue
)
while(!ThreadCounter.cancel) {
int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
ThreadCounter.cancel = true;
queue.offer(ThreadCounter.poisonPill);
} else {
Object obj = queue.take();
ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
ThreadCounter.queueCounter.decrementAndGet();
if(obj == ThreadCounter.poisonPill) {
queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
continue;
}
}
}
Wenn ein Thread auf dem BlockingQueue
zu blockieren ist etwa dann dekrementiert sie der Zähler; Wenn alle Threads bereits auf die Warteschlange warten (dh counter == 0
), setzt der letzte Thread cancel
auf true und sendet dann eine Giftpille durch die Warteschlange, um die anderen Threads zu aktivieren. Jeder Thread sieht die Giftpille, sendet sie zurück durch die Warteschlange, um die verbleibenden Threads zu aktivieren, und beendet dann die Schleife, wenn er feststellt, dass cancel
auf "True" gesetzt ist.
Edit: ich die Daten Rennen entfernt haben, indem ein queueCounter
hinzufügen, die eine Zählung der Anzahl der Objekte in der Warteschlange unterhält (natürlich werden Sie auch einen queueCounter.incrementAndGet()
Aufruf wohin hinzufügen müssen Sie Objekte hinzufügen in die Warteschlange). Dies funktioniert wie folgt: Wenn threadCount == 0
, aber queueCount != 0
, dann bedeutet dies, dass ein Thread gerade ein Element aus der Warteschlange entfernt hat, aber threadCount.getAndIncrement
noch nicht aufgerufen hat, und daher ist die Abbruchvariable nicht auf True festgelegt. Es ist wichtig, dass der Anruf threadCount.getAndIncrement
dem Anruf queueCount.getAndDecrement
vorausgeht, andernfalls haben Sie immer noch ein Datenrennen. Es sollte egal sein, in welcher Reihenfolge Sie queueCount.getAndIncrement
aufrufen, da Sie dies nicht mit einem Aufruf an threadCount.getAndDecrement
verschachteln werden (letzterer wird am Ende der Schleife aufgerufen, ersterer wird am Anfang der Schleife aufgerufen).
Beachten Sie, dass Sie nicht einfach queueCount
verwenden können, um zu bestimmen, wann der Prozess beendet werden soll, da ein Thread möglicherweise noch aktiv ist, ohne dass Daten in die Warteschlange gestellt wurden. Mit anderen Worten: queueCount
wäre null ist nicht Null, sobald der Thread seine aktuelle Iteration beendet hat.
Anstatt die poisonPill
wiederholt durch die Warteschlange zu senden, können Sie stattdessen den Cancelling-Thread (N-1) poisonPills
über die Warteschlange senden lassen. Seien Sie vorsichtig, wenn Sie diesen Ansatz mit einer anderen Warteschlange verwenden, da einige Warteschlangen (z. B. Amazon Simple Queue Service) möglicherweise mehrere Elemente entsprechend ihrer take
-Methode zurückgeben. In diesem Fall müssen Sie wiederholt über die poisonPill
senden dass alles still steht.Zusätzlich
, stattdessen eine while(!cancel)
Schleife zu verwenden, können Sie eine while(true)
Schleife verwenden und brechen, wenn die Schleife eine poisonPill
erkennt i flüchtig sein erraten abbrechen sollte? – marcorossi
Was passiert, wenn ein Thread zwischen dem Aufruf von queue.take() und incrementAndGet() liegt und der andere Thread die Bedingungen count == 0 und queue.isEmpty() auswertet? – marcorossi
@marcorossi yeah 'cancel' sollte flüchtig sein, so dass die Änderung in allen Threads sichtbar ist. Zum Beispiel sehen alle Threads das letzte Schreiben in das 'cancel'-Feld, es garantiert, dass Lesen und Schreiben atomar sind. Habe die Antwort bearbeitet. – adamjmarkham