2016-05-29 3 views
10

Der Standard "paralellStream()" in Java 8 verwendet die allgemeine ForkJoinPool, die ein Latenzproblem sein kann, wenn die gemeinsamen Pool-Threads erschöpft sind, wenn eine Aufgabe gesendet wird. In vielen Fällen ist jedoch genügend CPU-Leistung verfügbar und die Tasks sind kurz genug, so dass dies kein Problem darstellt. Wenn wir einige lange laufende Aufgaben haben, wird dies natürlich einige sorgfältige Überlegungen erfordern, aber für diese Frage nehmen wir an, dass dies nicht das Problem ist.Stimmt etwas nicht mit der Verwendung von I/O + ManagedBlocker in Java8 parallelStream()?

Aber das Füllen der ForkJoinPool mit I/O-Aufgaben, die eigentlich keine CPU-gebundene Arbeit tun, ist eine Möglichkeit, einen Engpass einzuführen, obwohl genügend CPU-Leistung verfügbar ist. I understood that. Aber das ist es, was wir für die ManagedBlocker haben. Also, wenn wir eine I/O-Aufgabe haben, sollten wir einfach ForkJoinPool erlauben, das innerhalb einer ManagedBlocker zu verwalten. Das klingt unglaublich einfach. Aber zu meiner Überraschung ist die Verwendung einer ManagedBlocker ziemlich komplizierte API für die einfache Sache, die es ist. Und schließlich denke ich, dass dies ein häufiges Problem ist. Also baute ich nur eine einfache Dienstprogramm Methode, die ManagedBlocker s einfach zu bedienen für den gemeinsamen Fall macht:

public class BlockingTasks { 

    public static<T> T callInManagedBlock(final Supplier<T> supplier) { 
     final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier); 
     try { 
      ForkJoinPool.managedBlock(managedBlock); 
     } catch (InterruptedException e) { 
      throw new Error(e); 
     } 
     return managedBlock.getResult(); 
    } 

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker { 
     private final Supplier<T> supplier; 
     private T result; 
     private boolean done = false; 

     private SupplierManagedBlock(final Supplier<T> supplier) { 
      this.supplier = supplier; 
     } 

     @Override 
     public boolean block() { 
      result = supplier.get(); 
      done = true; 
      return true; 
     } 

     @Override 
     public boolean isReleasable() { 
      return done; 
     } 

     public T getResult() { 
      return result; 
     } 
    } 
} 

Nun, wenn ich den HTML-Code eine Reihe von Websites in paralell zum Download Ich mag es so zu können, ohne die I/O verursacht keine Probleme:

public static void main(String[] args) { 
    final List<String> pagesHtml = Stream 
     .of("https://google.com", "https://stackoverflow.com", "...") 
     .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url))) 
     .collect(Collectors.toList()); 
} 

ich ein wenig bin überrascht, dass es keine Klasse mit Java wie die BlockingTasks oben ausgeliefert ist (? oder ich fand es nicht), aber es war nicht so schwer zu bauen.

Als ich für "java 8 parallelen Strom" google ich in den ersten vier Ergebnisse jene Artikel, die behaupten, dass aufgrund der I/O Problem Fork/Join saugt in Java:

Ich habe meine Suchbegriffe etwas geändert und während sich dort viele Leute darüber beschweren, wie schrecklich das Leben ist, habe ich niemanden gefunden, der von einer Lösung wie der obigen sprach. Da ich mich nicht wie Marvin fühle (Gehirn wie ein Planet) und Java 8 für eine ganze Weile verfügbar ist, vermute ich, dass mit dem, was ich dort vorschlage, etwas schrecklich falsch ist.

Ich schlug zusammen einen kleinen Test:

public static void main(String[] args) { 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start"); 
    IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End"); 
} 

public static void sleep() { 
    try { 
     System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName()); 
     Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     throw new Error(e); 
    } 
} 

Ich lief, dass ein folgendes Ergebnis bekam:

18:41:29.021: Start 
18:41:29.033: Sleeping main 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7 
18:41:39.034: Sleeping main 
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:49.035: End 

Also auf meinem 8-CPU-Computer die ForkJoinPool natürlich 8 Threads wählen, hat das erste 8 Tasks und schließlich die letzten beiden Tasks, was bedeutet, dass dies 20 Sekunden dauerte und wenn andere Tasks in der Warteschlange waren, konnte der Pool immer noch die klar im Leerlauf befindlichen CPUs nicht benutzen (mit Ausnahme von 6 Kernen in den letzten 10 Sekunden).

Dann habe ich ...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; })); 

... statt ...

IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 

... und bekam folgendes Ergebnis:

18:44:10.93: Start 
18:44:10.945: Sleeping main 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3 
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11 
18:44:20.957: End 

Es sieht für mich wie das funktioniert, wurden zusätzliche Themen meine mock "Blocking I/O-Aktion" (Schlaf) zu kompensieren. Die Zeit wurde auf 10 Sekunden reduziert und ich nehme an, dass wenn ich mehr Aufgaben in die Warteschlange stellen würde, diese immer noch die verfügbare CPU-Leistung nutzen könnten.

Gibt es etwas Falsches mit dieser Lösung oder allgemein mit I/O in Streams, wenn die I/O-Operation in eine ManagedBlock verpackt ist?

Antwort

6

Kurz gesagt, es gibt einige Probleme mit Ihrer Lösung. Es verbessert definitiv die Verwendung von Blockierungscode innerhalb des parallelen Datenstroms, und einige Bibliotheken von Drittanbietern bieten ähnliche Lösungen (siehe zum Beispiel Blocking Klasse in der jOOλ-Bibliothek). Diese Lösung ändert jedoch nicht die in der Stream API verwendete interne Splitting-Strategie. Die Anzahl der Teilaufgaben von Stream-API erstellt wird durch die vordefinierte Konstante in AbstractTask Klasse gesteuert:

/** 
* Default target factor of leaf tasks for parallel decomposition. 
* To allow load balancing, we over-partition, currently to approximately 
* four tasks per processor, which enables others to help out 
* if leaf tasks are uneven or some processors are otherwise busy. 
*/ 
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 

Wie Sie es viermal größer als gemeinsamen Pool Parallelität sehen können (die standardmäßig Anzahl der CPU-Kerne). Der echte Splitting-Algorithmus ist ein wenig komplizierter, aber grob können Sie nicht mehr als 4x-8x-Aufgaben haben, selbst wenn sie alle blockieren. Wenn Sie beispielsweise 8 CPU-Kerne haben, funktioniert Ihr Test Thread.sleep() problemlos bis IntStream.range(0, 32) (als 32 = 8 * 4). Für IntStream.range(0, 64) haben Sie jedoch 32 parallele Aufgaben, die jeweils zwei Eingangsnummern verarbeiten, so dass die gesamte Verarbeitung 20 Sekunden dauern würde, nicht 10.

+0

Guter Punkt mit der Dekomposition. Dies begrenzt natürlich die Zeit, die eine einzelne Aufgabe dauern kann, begrenzt jedoch nicht den Gesamtdurchsatz, wenn genügend andere Rechenaufgaben in der Warteschlange sind. Fazit: Wenn nur der Durchsatz ein Problem ist, ist die Lösung in Ordnung. Wenn die Antwortzeit einer einzelnen E/A-Task ein Problem ist und die betreffende einzelne E/A-Task in mehreren Schritten zerlegt werden kann, sollte eine andere Lösung in Betracht gezogen werden. – yankee

+3

Und nicht zu vergessen: Die Verwendung von Fork/Join durch die Stream-API ist ein Implementierungsdetail. Solange die Verwendung dieses Frameworks durch Streams nicht garantiert ist, kann nicht garantiert werden, dass die Verwendung von 'ManagedBlocker' die Parallelität verbessert. – Holger

Verwandte Themen