Der Standard "paralellStream()" in Java 8 verwendet die allgemeine ForkJoinPool
, die ein Latenzproblem sein kann, wenn die gemeinsamen Pool-Threads erschöpft sind, wenn eine Aufgabe gesendet wird. In vielen Fällen ist jedoch genügend CPU-Leistung verfügbar und die Tasks sind kurz genug, so dass dies kein Problem darstellt. Wenn wir einige lange laufende Aufgaben haben, wird dies natürlich einige sorgfältige Überlegungen erfordern, aber für diese Frage nehmen wir an, dass dies nicht das Problem ist.Stimmt etwas nicht mit der Verwendung von I/O + ManagedBlocker in Java8 parallelStream()?
Aber das Füllen der ForkJoinPool
mit I/O-Aufgaben, die eigentlich keine CPU-gebundene Arbeit tun, ist eine Möglichkeit, einen Engpass einzuführen, obwohl genügend CPU-Leistung verfügbar ist. I understood that. Aber das ist es, was wir für die ManagedBlocker
haben. Also, wenn wir eine I/O-Aufgabe haben, sollten wir einfach ForkJoinPool
erlauben, das innerhalb einer ManagedBlocker
zu verwalten. Das klingt unglaublich einfach. Aber zu meiner Überraschung ist die Verwendung einer ManagedBlocker
ziemlich komplizierte API für die einfache Sache, die es ist. Und schließlich denke ich, dass dies ein häufiges Problem ist. Also baute ich nur eine einfache Dienstprogramm Methode, die ManagedBlocker
s einfach zu bedienen für den gemeinsamen Fall macht:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
Nun, wenn ich den HTML-Code eine Reihe von Websites in paralell zum Download Ich mag es so zu können, ohne die I/O verursacht keine Probleme:
public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
ich ein wenig bin überrascht, dass es keine Klasse mit Java wie die BlockingTasks
oben ausgeliefert ist (? oder ich fand es nicht), aber es war nicht so schwer zu bauen.
Als ich für "java 8 parallelen Strom" google ich in den ersten vier Ergebnisse jene Artikel, die behaupten, dass aufgrund der I/O Problem Fork/Join saugt in Java:
- https://dzone.com/articles/think-twice-using-java-8
- http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/ (zumindest erwähnt
ManagedBlocker
sage aber auch „in einem verschiedenen Anwendungsfall Sie in der Lage sein würden ihm ein ManagedBlocker Beispiel zu geben.“ es erwähnt nicht, warum in diesem Fall nicht.
Ich habe meine Suchbegriffe etwas geändert und während sich dort viele Leute darüber beschweren, wie schrecklich das Leben ist, habe ich niemanden gefunden, der von einer Lösung wie der obigen sprach. Da ich mich nicht wie Marvin fühle (Gehirn wie ein Planet) und Java 8 für eine ganze Weile verfügbar ist, vermute ich, dass mit dem, was ich dort vorschlage, etwas schrecklich falsch ist.
Ich schlug zusammen einen kleinen Test:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
Ich lief, dass ein folgendes Ergebnis bekam:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
Also auf meinem 8-CPU-Computer die ForkJoinPool
natürlich 8 Threads wählen, hat das erste 8 Tasks und schließlich die letzten beiden Tasks, was bedeutet, dass dies 20 Sekunden dauerte und wenn andere Tasks in der Warteschlange waren, konnte der Pool immer noch die klar im Leerlauf befindlichen CPUs nicht benutzen (mit Ausnahme von 6 Kernen in den letzten 10 Sekunden).
Dann habe ich ...
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
... statt ...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
... und bekam folgendes Ergebnis:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
Es sieht für mich wie das funktioniert, wurden zusätzliche Themen meine mock "Blocking I/O-Aktion" (Schlaf) zu kompensieren. Die Zeit wurde auf 10 Sekunden reduziert und ich nehme an, dass wenn ich mehr Aufgaben in die Warteschlange stellen würde, diese immer noch die verfügbare CPU-Leistung nutzen könnten.
Gibt es etwas Falsches mit dieser Lösung oder allgemein mit I/O in Streams, wenn die I/O-Operation in eine ManagedBlock
verpackt ist?
Guter Punkt mit der Dekomposition. Dies begrenzt natürlich die Zeit, die eine einzelne Aufgabe dauern kann, begrenzt jedoch nicht den Gesamtdurchsatz, wenn genügend andere Rechenaufgaben in der Warteschlange sind. Fazit: Wenn nur der Durchsatz ein Problem ist, ist die Lösung in Ordnung. Wenn die Antwortzeit einer einzelnen E/A-Task ein Problem ist und die betreffende einzelne E/A-Task in mehreren Schritten zerlegt werden kann, sollte eine andere Lösung in Betracht gezogen werden. – yankee
Und nicht zu vergessen: Die Verwendung von Fork/Join durch die Stream-API ist ein Implementierungsdetail. Solange die Verwendung dieses Frameworks durch Streams nicht garantiert ist, kann nicht garantiert werden, dass die Verwendung von 'ManagedBlocker' die Parallelität verbessert. – Holger