2016-02-24 4 views
5

Gibt es eine Warteschlange ausstehender Tasks, die in Verbindung mit Java 8s Executors.newWorkStealingPool() verwendet werden?Bietet Executors.newWorkStealingPool() in Java 8 auch eine Aufgabenwarteschlange?

Angenommen, die # verfügbaren Kerne sind 2, und Executors.newWorkStealingPool() ist leer, weil 2 Aufgaben bereits ausgeführt werden. Was passiert dann, wenn eine dritte Aufgabe an den Work-Stealing Executor übergeben wird? Ist es in der Warteschlange? Und wenn ja, was sind die Grenzen wenn in der Warteschlange?

Vielen Dank im Voraus.

+1

Ich habe keine spezifische Antwort, und ich bin überrascht, dass dies nicht besser dokumentiert ist. Aber zumindest in OpenJDK 8 erzeugt diese Methode einen ['ForkJoinPool'] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html), das nicht einfach eine 'BlockingQueue' verwendet, wie es andere Implementierungen tun ..., die viele Konflikte verursacht, was zu einem Overhead führt. Aufgaben, die nicht sofort ausgeführt werden können, sind jedoch noch in der Warteschlange. Dies wird (zusammen mit Warteschlangenbegrenzungen) in einer anderen Antwort erläutert: http://StackOverflow.com/a/30045601/228171 –

Antwort

4

Gibt es eine Warteschlange ausstehender Aufgaben, die in Verbindung mit Executors.newWorkStealingPool() von Java 8 verwendet werden?

Ja, jeder Thread ist mit einer eigenen Deque gesichert. Wenn ein Thread mit seinen Aufgaben erledigt ist, übernimmt er die Aufgabe aus dem Deque des anderen Threads und führt sie aus.

Und wenn ja, was sind die Grenzen wenn in der Warteschlange?

Maximale Größe für die Warteschlangen wird durch die Anzahl begrenzt: static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

Wenn die Warteschlange voll ist eine ungeprüfte Ausnahme ausgelöst wird: RejectedExecutionException("Queue capacity exceeded")

3

Von grepcode von Executors und ForkJoinPool

Executors. newWorkStealingPool kehrt ForkJoinPool

Testamentsvollstrecker:

public static ExecutorService newWorkStealingPool() { 
     return new ForkJoinPool 
      (Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, true); 
    } 

ForkJoinPool:

public ForkJoinPool(int parallelism, 
         ForkJoinWorkerThreadFactory factory, 
         UncaughtExceptionHandler handler, 
         boolean asyncMode) { 
     this(checkParallelism(parallelism), 
      checkFactory(factory), 
      handler, 
      asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 
      "ForkJoinPool-" + nextPoolId() + "-worker-"); 
     checkPermission(); 
    } 

Auf execute():

public void execute(ForkJoinTask<?> task) { 
     if (task == null) 
      throw new NullPointerException(); 
     externalPush(task); 
    } 

externalPush Anrufe externalSubmit und Sie können WorkQueue Details in dieser Implementierung sehen.

externalSubmit:

// Externe Operationen

/** 
* Full version of externalPush, handling uncommon cases, as well 
* as performing secondary initialization upon the first 
* submission of the first task to the pool. It also detects 
* first submission by an external thread and creates a new shared 
* queue if the one at index if empty or contended. 
* 
* @param task the task. Caller must ensure non-null. 

*/ 

Sie können weitere Informationen über Warteschlange Größen in WorkQueue Klasse

static final class WorkQueue { 

Dokumentation auf WokrQueue finden:

/** 
    * Queues supporting work-stealing as well as external task 
    * submission. See above for descriptions and algorithms. 
    * Performance on most platforms is very sensitive to placement of 
    * instances of both WorkQueues and their arrays -- we absolutely 
    * do not want multiple WorkQueue instances or multiple queue 
    * arrays sharing cache lines. The @Contended annotation alerts 
    * JVMs to try to keep instances apart. 
    */ 
    @sun.misc.Contended 

/** 
    * Capacity of work-stealing queue array upon initialization. 
    * Must be a power of two; at least 4, but should be larger to 
    * reduce or eliminate cacheline sharing among queues. 
    * Currently, it is much larger, as a partial workaround for 
    * the fact that JVMs often place arrays in locations that 
    * share GC bookkeeping (especially cardmarks) such that 
    * per-write accesses encounter serious memory contention. 
    */ 
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 

    /** 
    * Maximum size for queue arrays. Must be a power of two less 
    * than or equal to 1 << (31 - width of array entry) to ensure 
    * lack of wraparound of index calculations, but defined to a 
    * value a bit less than this to help users trap runaway 
    * programs before saturating systems. 
    */ 
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M