2010-05-07 11 views
14

Ich arbeite mit einem java.util.concurrent.ThreadPoolExecutor, um eine Reihe von Elementen parallel zu verarbeiten. Obwohl das Threading selbst in Ordnung ist, haben wir manchmal aufgrund von Aktionen, die in den Threads stattfanden, auf andere Ressourceneinschränkungen gestoßen, was uns dazu brachte, die Anzahl der Threads im Pool zu verringern.Dynamische Größenanpassung von java.util.concurrent.ThreadPoolExecutor, während es Tasks wartet

Ich würde gerne wissen, ob es eine Möglichkeit gibt, die Anzahl der Threads zu wählen, während die Threads tatsächlich funktionieren. Ich weiß, dass Sie setMaximumPoolSize() und/oder setCorePoolSize() aufrufen können, aber diese ändern nur die Größe des Pools, sobald Threads inaktiv werden, aber sie werden nicht inaktiv, bis keine Tasks mehr in der Warteschlange warten.

Antwort

4

Soweit ich sagen kann, ist dies in einer schönen, sauberen Art und Weise nicht möglich.

Sie können die beforeExecute-Methode implementieren, um einen booleschen Wert zu überprüfen und Threads vorübergehend anhalten zu lassen. Beachten Sie, dass sie eine Aufgabe enthalten, die erst ausgeführt wird, wenn sie wieder aktiviert wird.

Alternativ können Sie nachExecute implementieren, um eine RuntimeException auszulösen, wenn Sie gesättigt sind. Dies wird effektiv dazu führen, dass der Thread stirbt und da der Executor über dem Maximum liegt, würde kein neuer erstellt werden.

Ich empfehle Ihnen nicht, entweder zu tun. Versuchen Sie stattdessen, einen anderen Weg zu finden, um die gleichzeitige Ausführung der Aufgaben zu kontrollieren, die Ihnen ein Problem bereiten. Möglicherweise indem Sie sie in einem separaten Thread-Pool mit einer begrenzten Anzahl von Arbeitern ausführen.

+0

Um die Wähler unten. Wenn Sie die Frage des OP lesen, antwortete die hochgeschätzte zweite Antwort ursprünglich nicht. Das OP sagt so viel. Nachfolgende Änderungen implementieren die Vorschläge in dieser Antwort. –

0

Ich habe die Dokumentation von setMaximumPoolSize() und setCorePoolSize() gelesen, und es scheint, als ob sie das Verhalten erzeugen können, das Sie brauchen.

- EDIT -

Meine Schlussfolgerung falsch war: Sie bitte die Diskussion unten für Details siehe ...

+0

Die Arbeiter nicht sofort verlassen wird. "Wenn der neue Wert kleiner als der aktuelle Wert ist, werden überschüssige vorhandene Threads beendet, wenn sie als nächstes inaktiv werden." Solange Aufgaben im Executor vorhanden sind oder der Executor rechtzeitig eingegeben wird, werden die Workers nicht beendet. –

+0

@Tim - ja, das ist was Eyal sagt. Möchte das OP, dass seine bestehenden Threads ohne Rücksicht beendet werden? Das wäre nichts, was ein Testamentsvollstrecker tun könnte, selbst wenn er es wollte.Das OP könnte einen Booleschen Wert für bestimmte Tasks setzen, um ihnen zu sagen, dass sie früh enden sollen, und es wäre wahrscheinlich eine gute Idee, einen Interrupt zu senden, wenn dieser Boolean gesetzt ist. Aber das ist App-Level-Logik - nicht etwas, was ein Executor aus der Hand tun könnte. –

+0

Eine andere zufällige Idee wäre, wenn der Executor eine bestimmte Anzahl von Threads drosseln könnte, indem er deren Priorität reduziert. Das könnte den gleichen Gesamteffekt ergeben. Das ist nur eine zufällige Idee - es könnte große Fehler haben, über die ich nicht nachdenke. –

32

Sie können absolut. Durch den Aufruf von setCorePoolSize(int) wird die Kerngröße des Pools geändert. Aufrufe dieser Methode sind Thread-sicher und überschreiben Einstellungen für den Konstruktor ThreadPoolExecutor. Wenn Sie die Poolgröße reduzieren, werden die verbleibenden Threads nach Abschluss der aktuellen Jobwarteschlange heruntergefahren (wenn sie inaktiv sind, werden sie sofort heruntergefahren). Wenn Sie die Poolgröße erhöhen, werden neue Threads so schnell wie möglich zugewiesen. Der Zeitrahmen für die Zuweisung neuer Threads ist nicht dokumentiert. In der Implementierung wird jedoch bei jedem Aufruf der Methode execute die Zuweisung neuer Threads ausgeführt.

diese Job-Farm mit einer Laufzeit-abstimmbaren So koppeln, können Sie diese Eigenschaft aussetzen können (entweder durch Wrapper oder einen dynamischen MBean Exporteur unter Verwendung) als Lese-Schreib-JMX-Attribut eine ziemlich nett zu erstellen, on-the-fly abstimmbarer Stapelprozessor.

Um die Poolgröße zwangsweise zur Laufzeit zu reduzieren (was Ihre Anforderung ist), müssen Sie die ThreadPoolExecutor Unterklasse bilden und der Methode beforeExecute(Thread,Runnable) eine Unterbrechung hinzufügen. Das Unterbrechen des Threads ist keine ausreichende Unterbrechung, da diese nur mit Wartezuständen interagiert und während der Verarbeitung die Task-Threads ThreadPoolExecutor nicht in einen unterbrechbaren Zustand gehen.

Ich hatte vor kurzem das gleiche Problem zu versuchen, einen Thread-Pool zwangsweise zu beenden, bevor alle übergebenen Aufgaben ausgeführt werden. Um dies zu erreichen, unterbrach ich den Thread, indem ich eine Laufzeitausnahme erst nach dem Ersetzen der UncaughtExceptionHandler des Threads mit einer, die meine spezifische Ausnahme erwartet und löscht.

/** 
* A runtime exception used to prematurely terminate threads in this pool. 
*/ 
static class ShutdownException 
extends RuntimeException { 
    ShutdownException (String message) { 
     super(message); 
    } 
} 

/** 
* This uncaught exception handler is used only as threads are entered into 
* their shutdown state. 
*/ 
static class ShutdownHandler 
implements UncaughtExceptionHandler { 
    private UncaughtExceptionHandler handler; 

    /** 
    * Create a new shutdown handler. 
    * 
    * @param handler The original handler to deligate non-shutdown 
    * exceptions to. 
    */ 
    ShutdownHandler (UncaughtExceptionHandler handler) { 
     this.handler = handler; 
    } 
    /** 
    * Quietly ignore {@link ShutdownException}. 
    * <p> 
    * Do nothing if this is a ShutdownException, this is just to prevent 
    * logging an uncaught exception which is expected. Otherwise forward 
    * it to the thread group handler (which may hand it off to the default 
    * uncaught exception handler). 
    * </p> 
    */ 
    public void uncaughtException (Thread thread, Throwable throwable) { 
     if (!(throwable instanceof ShutdownException)) { 
      /* Use the original exception handler if one is available, 
      * otherwise use the group exception handler. 
      */ 
      if (handler != null) { 
       handler.uncaughtException(thread, throwable); 
      } 
     } 
    } 
} 
/** 
* Configure the given job as a spring bean. 
* 
* <p>Given a runnable task, configure it as a prototype spring bean, 
* injecting any necessary dependencices.</p> 
* 
* @param thread The thread the task will be executed in. 
* @param job The job to configure. 
* 
* @throws IllegalStateException if any error occurs. 
*/ 
protected void beforeExecute (final Thread thread, final Runnable job) { 
    /* If we're in shutdown, it's because spring is in singleton shutdown 
    * mode. This means we must not attempt to configure the bean, but 
    * rather we must exit immediately (prematurely, even). 
    */ 
    if (!this.isShutdown()) { 
     if (factory == null) { 
      throw new IllegalStateException(
       "This class must be instantiated by spring" 
       ); 
     } 

     factory.configureBean(job, job.getClass().getName()); 
    } 
    else { 
     /* If we are in shutdown mode, replace the job on the queue so the 
     * next process will see it and it won't get dropped. Further, 
     * interrupt this thread so it will no longer process jobs. This 
     * deviates from the existing behavior of shutdown(). 
     */ 
     workQueue.add(job); 

     thread.setUncaughtExceptionHandler(
      new ShutdownHandler(thread.getUncaughtExceptionHandler()) 
      ); 

     /* Throwing a runtime exception is the only way to prematurely 
     * cause a worker thread from the TheadPoolExecutor to exit. 
     */ 
     throw new ShutdownException("Terminating thread"); 
    } 
} 

In Ihrem Fall sollten Sie eine Semaphore erstellen (nur für die Verwendung als THREAD Zähler), die keine Genehmigungen haben, und wenn Fäden lösen Abschalten eine Anzahl von Genehmigungen, um es, die das Delta entspricht der die vorherige Core-Pool-Größe und die neue Pool-Größe (Sie müssen die setCorePoolSize(int)-Methode überschreiben). Dadurch können Sie Ihre Threads beenden, nachdem ihre aktuelle Aufgabe abgeschlossen ist. angefordert -

private Semaphore terminations = new Semaphore(0); 

protected void beforeExecute (final Thread thread, final Runnable job) { 
    if (terminations.tryAcquire()) { 
     /* Replace this item in the queue so it may be executed by another 
     * thread 
     */ 
     queue.add(job); 

     thread.setUncaughtExceptionHandler(
      new ShutdownHandler(thread.getUncaughtExceptionHandler()) 
      ); 

     /* Throwing a runtime exception is the only way to prematurely 
     * cause a worker thread from the TheadPoolExecutor to exit. 
     */ 
     throw new ShutdownException("Terminating thread"); 
    } 
} 

public void setCorePoolSize (final int size) { 
    int delta = getActiveCount() - size; 

    super.setCorePoolSize(size); 

    if (delta > 0) { 
     terminations.release(delta); 
    } 
} 

Dies sollte n Gewinde für f (n) = aktiv unterbrechen. Wenn es irgendein Problem gibt, ist die Zuteilungsstrategie ThreadPoolExecutor ziemlich dauerhaft. Es Buch-hält vor vorzeitiger Beendigung mit einem finally Block, der die Ausführung garantiert. Aus diesem Grund werden sie, selbst wenn Sie zu viele Threads beenden, erneut gefüllt.

+1

Ja, ich weiß, dass Sie die Änderung vornehmen können und sie wirksam werden, sobald die Threads inaktiv sind. Was ich jedoch fand, war, dass es nicht "leer" ist, wenn ein Thread seine Aufgabe abschließt, es sei denn, es warten keine anderen Aufgaben. Ich fragte, ob es einen Weg gäbe, die Änderung sofort wirksam werden zu lassen, so dass, selbst wenn es wartende Aufgaben gibt, sie von der neuen gewünschten Anzahl von Threads behandelt werden (wie in meinem Beispiel, wo wir sofort die Menge der gleichzeitig verwendeten Ressourcen). –

+0

Ich habe meinen Beitrag aktualisiert, um eine Lösung zu beschreiben, die von einem Problem abgeleitet wurde, das ich kürzlich hatte und das in der Natur ähnlich war. Bitte sehen Sie es sich an. –

+0

Ich verstehe es nicht, warum bekomme ich null Karma für diese Antwort? :-( –

6

Die Lösung besteht darin, die ThreadPoolExecutor-Warteschlange zu leeren, die ThreadPoolExecutor-Größe nach Bedarf festzulegen und dann die Threads nacheinander hinzuzufügen, sobald die anderen enden. Die Methode zum Leeren der Warteschlange in der ThreadPoolExecutor-Klasse ist privat, daher müssen Sie sie selbst erstellen. Hier ist der Code:

/** 
* Drains the task queue into a new list. Used by shutdownNow. 
* Call only while holding main lock. 
*/ 
public static List<Runnable> drainQueue() { 
    List<Runnable> taskList = new ArrayList<Runnable>(); 
    BlockingQueue<Runnable> workQueue = executor.getQueue(); 
    workQueue.drainTo(taskList); 
    /* 
    * If the queue is a DelayQueue or any other kind of queue 
    * for which poll or drainTo may fail to remove some elements, 
    * we need to manually traverse and remove remaining tasks. 
    * To guarantee atomicity wrt other threads using this queue, 
    * we need to create a new iterator for each element removed. 
    */ 
    while (!workQueue.isEmpty()) { 
     Iterator<Runnable> it = workQueue.iterator(); 
     try { 
      if (it.hasNext()) { 
       Runnable r = it.next(); 
       if (workQueue.remove(r)) 
        taskList.add(r); 
      } 
     } catch (ConcurrentModificationException ignore) { 
     } 
    } 
    return taskList; 
} 

Bevor Sie diese Methode aufrufen, müssen Sie die Hauptsperre abrufen und dann loslassen. Um dies zu tun, müssen Sie Java-Reflektion verwenden, da das Feld "mainLock" privat ist. Auch hier ist der Code:

private Field getMainLock() throws NoSuchFieldException { 
    Field mainLock = executor.getClass().getDeclaredField("mainLock"); 
    mainLock.setAccessible(true); 
    return mainLock; 
} 

Wo "Vollstrecker" ist Ihr ThreadPoolExecutor.

Jetzt müssen Sie Sperren/Entsperren Methoden:

public void lock() { 
    try { 
     Field mainLock = getMainLock(); 
     Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null); 
     lock.invoke(mainLock.get(executor), (Object[])null); 
    } catch { 
     ... 
    } 
} 

public void unlock() { 
    try { 
     Field mainLock = getMainLock(); 
     mainLock.setAccessible(true); 
     Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null); 
     lock.invoke(mainLock.get(executor), (Object[])null); 
    } catch { 
     ... 
    } 
} 

Schließlich können Sie Ihre "setThreadsNumber" Methode schreiben, und es wird sowohl zunehmende Arbeit und die Verringerung der ThreadPoolExecutor Größe:

public void setThreadsNumber(int intValue) { 
    boolean increasing = intValue > executor.getPoolSize(); 
    executor.setCorePoolSize(intValue); 
    executor.setMaximumPoolSize(intValue); 
    if(increasing){ 
     if(drainedQueue != null && (drainedQueue.size() > 0)){ 
      executor.submit(drainedQueue.remove(0)); 
     } 
    } else { 
     if(drainedQueue == null){ 
      lock(); 
      drainedQueue = drainQueue(); 
      unlock(); 
     } 
    } 
} 

Hinweis: Wenn Sie N parallele Threads ausführen und wenn Sie diese Zahl auf N-1 ändern, werden alle N Threads weiterhin ausgeführt. Wenn der erste Thread endet, werden keine neuen Threads ausgeführt. Von jetzt an wird die Anzahl der parallelen Threads die von Ihnen gewählte sein.

0

Ich war in der Notwendigkeit für die gleiche Lösung auch, und es scheint, dass in JDK8 die setCorePoolSize() und setMaximumPoolSize() tatsächlich das gewünschte Ergebnis produzieren. Ich habe einen Testfall erstellt, in dem ich 4 Aufgaben an den Pool übergebe und sie ausgeführt werden. Ich verkleinere die Poolgröße, während sie ausgeführt werden, und übergebe noch eine weitere Runnable, die ich allein haben möchte. Dann stelle ich den Pool wieder auf seine ursprüngliche Größe zurück. Hier ist die Prüfquelle https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

Es erzeugt die folgende Ausgabe (thread „50“ ist derjenige, der sich allein ausgeführt werden sollte)

run: 
test thread 2 enter 
test thread 1 enter 
test thread 3 enter 
test thread 4 enter 
test thread 1 exit 
test thread 2 exit 
test thread 3 exit 
test thread 4 exit 
test thread 50 enter 
test thread 50 exit 
test thread 1 enter 
test thread 2 enter 
test thread 3 enter 
test thread 4 enter 
test thread 1 exit 
test thread 2 exit 
test thread 3 exit 
test thread 4 exit 
Verwandte Themen