Ich habe einen Prozess, der viele kleine Aufgaben parallel berechnen muss, und dann die Ergebnisse in der natürlichen Reihenfolge der Aufgaben verarbeiten. Dazu habe ich folgendes Setup:Fixed Thread Pool-Threads blockiert, wenn genug Aufgaben eingereicht
Ein einfaches ExecutorService und eine blockierende Warteschlange verwende ich die Zukunft Objekte zu halten zurückgegeben, wenn eine aufrufbare zum Testamentsvollstrecker vorgelegt wird:
ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);
Einige Debugging-Code die Anzahl der eingereichten und Anzahl der verarbeiteten Aufgaben zu zählen und sie in regelmäßigen Abständen ausschreiben (beachten sie, dass processed
am Ende des Task-Code selbst inkrementiert wird):
AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);
Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
@Override
public void run() {
l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
}
}, 60 * 1000, 60 * 1000);
ein Gewinde
die Aufgaben übernimmt aus einer Warteschlange (was eigentlich ist ya-Generator) und legt sie dem Vollstrecker, die sich ergebende Zukunft in der futures
Warteschlange stellen (das ist, wie ich sicher, ich ein Lauf nicht genügend Arbeitsspeicher) nicht zu viele Aufgaben einreichen:
Thread submitThread = new Thread(() ->
{
MyTask task;
try {
while ((task = taskQueue.poll()) != null) {
futures.put(exec.submit(task));
submitted.incrementAndGet();
}
} catch (Exception e) {l .error("Unexpected Exception", e);}
}, "SubmitTasks");
submitThread.start();
Der aktuelle Thread dann take
es abgeschlossene Aufgaben aus der Warteschlange futures
und behandelt das Ergebnis:
while (!futures.isEmpty() || submitThread.isAlive()) {
MyTask task = futures.take().get();
//process result
}
Wenn ich dies mit 8 Cores auf einen Server ausführen (beachten Sie, dass der Code zur Zeit 15 Threads verwendet) die CPU-Auslastung Spitzen nur bei etwa 60% . Ich sehe meine Debug-Ausgabe wie folgt aus:
INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950
Ein Thread-Dump zeigt, dass viele der Threadpool-Threads wie folgt blockieren:
"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Meine Interpretation der Debug-Ausgabe ist, dass an einem bestimmten Punkt mit der Zeit habe ich mindestens einige hundert Aufgaben, die an den Executor-Service übergeben wurden, aber nicht verarbeitet wurden (ich kann auch in der Stack-Ablaufverfolgung bestätigen, dass der SubmitTasks-Thread unter LinkedBlockingQueue.put
blockiert ist). Der Stack-Trace (und die Serverauslastungsstatistiken) zeigen mir jedoch, dass der Executor-Service auf LinkedBlockingQueue.take blockiert ist (was ich davon auslasse, dass die interne Task-Warteschlange leer ist).
Was lese ich falsch?