2015-12-10 3 views
7

Ich habe einen Prozess, der viele kleine Aufgaben parallel berechnen muss, und dann die Ergebnisse in der natürlichen Reihenfolge der Aufgaben verarbeiten. Dazu habe ich folgendes Setup:Fixed Thread Pool-Threads blockiert, wenn genug Aufgaben eingereicht

Ein einfaches ExecutorService und eine blockierende Warteschlange verwende ich die Zukunft Objekte zu halten zurückgegeben, wenn eine aufrufbare zum Testamentsvollstrecker vorgelegt wird:

ExecutorService exec = Executors.newFixedThreadPool(15); 
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64); 

Einige Debugging-Code die Anzahl der eingereichten und Anzahl der verarbeiteten Aufgaben zu zählen und sie in regelmäßigen Abständen ausschreiben (beachten sie, dass processed am Ende des Task-Code selbst inkrementiert wird):

AtomicLong processed = new AtomicLong(0); 
AtomicLong submitted = new AtomicLong(0); 

Timer statusTimer = new Timer(); 
statusTimer.schedule(new TimerTask() { 
     @Override 
     public void run() { 
     l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get()))); 
     }    
}, 60 * 1000, 60 * 1000); 
ein Gewinde

die Aufgaben übernimmt aus einer Warteschlange (was eigentlich ist ya-Generator) und legt sie dem Vollstrecker, die sich ergebende Zukunft in der futures Warteschlange stellen (das ist, wie ich sicher, ich ein Lauf nicht genügend Arbeitsspeicher) nicht zu viele Aufgaben einreichen:

Thread submitThread = new Thread(() -> 
{ 
    MyTask task; 
    try { 
     while ((task = taskQueue.poll()) != null) { 
      futures.put(exec.submit(task)); 
      submitted.incrementAndGet(); 
     } 
    } catch (Exception e) {l .error("Unexpected Exception", e);} 
}, "SubmitTasks"); 
submitThread.start(); 

Der aktuelle Thread dann take es abgeschlossene Aufgaben aus der Warteschlange futures und behandelt das Ergebnis:

while (!futures.isEmpty() || submitThread.isAlive()) { 
    MyTask task = futures.take().get(); 
    //process result 
} 

Wenn ich dies mit 8 Cores auf einen Server ausführen (beachten Sie, dass der Code zur Zeit 15 Threads verwendet) die CPU-Auslastung Spitzen nur bei etwa 60% . Ich sehe meine Debug-Ausgabe wie folgt aus:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947 
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889 
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853 
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871 
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818 
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950 

Ein Thread-Dump zeigt, dass viele der Threadpool-Threads wie folgt blockieren:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 
     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) 
     at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) 
     at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439) 
     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Meine Interpretation der Debug-Ausgabe ist, dass an einem bestimmten Punkt mit der Zeit habe ich mindestens einige hundert Aufgaben, die an den Executor-Service übergeben wurden, aber nicht verarbeitet wurden (ich kann auch in der Stack-Ablaufverfolgung bestätigen, dass der SubmitTasks-Thread unter LinkedBlockingQueue.put blockiert ist). Der Stack-Trace (und die Serverauslastungsstatistiken) zeigen mir jedoch, dass der Executor-Service auf LinkedBlockingQueue.take blockiert ist (was ich davon auslasse, dass die interne Task-Warteschlange leer ist).

Was lese ich falsch?

Antwort

0

Threads mit BlockingQueue s sind immer knifflig. Nur indem Sie Ihren Code ansehen, ohne jemals mit der Skalierung zu laufen, die Sie tun. Ich habe einige Vorschläge. Es ist der Rat vieler Experten in der Branche wie Jessica Kerr, dass Sie nie für immer blockieren sollten. Sie können Methoden mit Timeouts in LinkedBlockingQueue verwenden.

Thread submitThread = new Thread(() -> 
{ 
    MyTask task; 
    try { 
     while ((task = taskQueue.peek()) != null) { 
      boolean success = futures.offer(exec.submit(task), 1000, TimeUnit.MILLISECONDS); 
      if(success) { 
       submitted.incrementAndGet(); 
       taskQueue.remove(task); 
      } 
     } 
    } catch (Exception e) {l .error("Unexpected Exception", e);} 
}, "SubmitTasks"); 
submitThread.start(); 

Und hier auch.

while (!futures.isEmpty() || submitThread.isAlive()) { 
    Future<MyTask> f = futures.poll(1000, TimeUnit.MILLISECONDS); 
    if(f != null) { 
     MyTask task = f.get(); 
    } 
    //process result 
} 

Dieses Video sehen von Jessica Kerr auf Concurrency tools in JVM

Verwandte Themen