2017-12-08 4 views
3

Ich arbeite mit Java 8 Streams. Ich habe eine benutzerdefinierte Funktion, foo(), die ein Objekt ergibt, und ich möchte die Objekte, die es erstellt, parallel streamen. Ich weiß, foo() ist nicht Thread sicher. Wenn ich Stream.generate(foo).parallel() schreibe, wird foo() asynchron aufgerufen? I.e. Werden Objekte seriell erzeugt und an parallele Threads übergeben, oder werden mehrere Threads je nach Bedarf Objekte erzeugen, indem sie foo()?Für Stream.generate (foo) .parallel(), muss foo threadsicher sein?

Antwort

3

Während Daten Rennen kein garantiertes Verhalten sind, der folgende Code

System.out.println(
    Stream.generate(new Supplier<Integer>() { 
     int i; @Override public Integer get() { return i++; } 
    }).parallel() 
     .limit(10_000) 
     .collect(BitSet::new, BitSet::set, BitSet::or) 
     .cardinality() 
); 

Zahlen reproduzierbar druckt weniger als 10.000 in meiner Umgebung, was zeigt, dass fehlende Updates tatsächlich passieren können, wenn der Lieferant nicht Thread-sicher ist.

Beachten Sie, dass es auch möglich ist, dass der Lieferant nach mehr Elementen gefragt wird, als für die Ergebnisauswertung benötigt werden. Z.B.

LongAdder adder = new LongAdder(); 
System.out.println(
    Stream.generate(new Supplier<Integer>() { 
     int i; @Override public Integer get() { adder.increment(); return i++; } 
    }).parallel() 
     .limit(10_000) 
     .collect(BitSet::new, BitSet::set, BitSet::or) 
     .cardinality() 
); 
System.out.println("queried "+adder+" times"); 

in der Regel berichtet über eine Anzahl von Anfragen von mehr als 10.000, während zugleich die Ergebnisberichte weniger als 10.000 verschiedene Elemente aufgrund der Daten Rennen.

Wenn Sie den Lieferanten-Thread sicher machen, wird das Ergebnis auf die korrekte Anzahl von 10.000 verschiedenen Elementen geändert, aber der Lieferant kann immer noch mehr als 10.000 Mal abgefragt werden. Daher kann das Ergebnis nicht genau die Zahlen von 0 bis 9.999 enthalten , da der über generate erzeugte Strom ungeordnet ist, so können alle 10.000 verschiedenen Nummern vom Lieferanten verwendet werden.

+0

Ich habe den Punkt in keinem der beiden Beispiele erwähnt. Nun, die erste Art von Sinn ist sinnvoll, eine gemeinsame Variable wird aus mehreren Threads aktualisiert, was impliziert, dass der Lieferant threadsicher sein muss, aber die zweite ist über 'limit' perse, oder nur um den ersten Punkt durchzusetzen? – Eugene

+1

@Eugene: Auch wenn Ihr Lieferant threadsicher ist, dürfen Sie nicht davon ausgehen, dass nur die verbrauchten Elemente abgefragt werden. Dies gilt für alle Kurzschlussvorgänge (und unendliche Ströme erfordern einen Kurzschlussvorgang). 'Limit' ist nur ein offensichtliches Beispiel. Und es ist leicht zu übersehen, dass der von 'generate' zurückgegebene Stream * ungeordnet * ist, so dass sich viele Anwendungsfälle, die jemandem beim Betrachten dieser Methode in den Sinn kommen, als unpassend herausstellen, wenn sie erneut darüber nachdenken ... – Holger

+0

@Holger I Ich schätze die obige Klarstellung, das ist ein subtiler und wichtiger Punkt – MyStackRunnethOver

5

Der Lieferant wird von mehrer Threads aufgerufen werden, wie Sie mit einem schnellen Experiment beobachten können:

Stream.generate(() -> Thread.currentThread().getId()) 
    .parallel() 
    .limit(100000) 
    .distinct() 
    .forEach(System.out::println); 
+1

Der Aufruf von mehreren Threads bedeutet nicht notwendigerweise, dass sie gleichzeitig aufgerufen werden. Vergleiche z.B. 'forEachOrdered', dessen Aktion von verschiedenen Threads aufgerufen werden kann, aber nicht gleichzeitig. Dieser Test reicht also nicht aus, um festzustellen, ob der Lieferant fadensicher sein muss. (In diesem Fall wird der Lieferant tatsächlich gleichzeitig bewertet und muss threadsicher sein). – Holger

Verwandte Themen