2015-11-21 3 views
6

Ich versuche, eine Methode zu schreiben, die die Indizes eines Objekts in einer Liste von Listen findet und die Parallelität nutzt. Hier ist mein Code.Streams verwenden, um ein Objekt in einer Liste von Listen zu finden

// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present. 
public static int[] indices(List<? extends List<?>> lists, Object o) { 
    return IntStream.range(0, lists.size()) 
        .boxed() 
        .flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j})) 
        .parallel() 
        .filter(a -> { 
         System.out.println(Arrays.toString(a));  // For testing only 
         return Objects.equals(o, lists.get(a[0]).get(a[1])); 
        }) 
        .findAny() 
        .orElse(null); 
} 

Wenn ich den folgenden Code ausführen

List<List<String>> lists = Arrays.asList(
     Arrays.asList("A", "B", "C"), 
     Arrays.asList("D", "E", "F", "G"), 
     Arrays.asList("H", "I"), 
     Collections.nCopies(5, "J") 
); 
System.out.println("Indices are " + Arrays.toString(indices(lists, "J"))); 

der Ausgang ist so etwas wie

[0, 0] 
[0, 1] 
[0, 2] 
[3, 0] 
[3, 1] 
[3, 2] 
[3, 3] 
[2, 0] 
[3, 4] 
[1, 0] 
[1, 1] 
[2, 1] 
[1, 2] 
[1, 3] 
Indices are [3, 0] 

Mit anderen Worten, die Suche wird fortgesetzt, auch nachdem das Objekt gefunden wurde. Ist nicht findAny soll ein Kurzschlussbetrieb sein? Was vermisse ich? Wie kann man die Parallelität am besten ausnutzen, wenn man über eine Liste von Listen oder ein gezacktes Array iteriert?

EDIT

Im Anschluss an die Vorstellung in @ Sotirios Antwort bekam ich eine Ausgabe von

Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 0] 
Thread[main,5,main] [2, 0] 
Thread[main,5,main] [2, 1] 
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 0] 
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 1] 
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 2] 
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 3] 
Thread[main,5,main] [0, 0] 
Thread[main,5,main] [0, 1] 
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 1] 
Thread[main,5,main] [0, 2] 
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 2] 
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 3] 
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 4] 
Indices are [3, 0] 

Beachten Sie, dass

Thread[ForkJoinPool.commonPool-worker-3,5,main] 

weiterhin die Suche auch nach der Antwort gefunden wird.

+0

Verwenden Sie stattdessen findFirst(). –

+0

@TaharBakir Es setzt die Suche fort. –

+1

Außerdem kann Parallelität einige Zeit dauern, bevor ein Thread die anderen benachrichtigen kann, dass sie nicht weitermachen müssen. –

Antwort

7

Kurzschlüsse Operationen garantieren nicht nur so wenige Elemente zu ziehen, wie es ihr Ergebnis zu produzieren nimmt. Sie können tun, aber es ist nicht erforderlich.

Die aktuelle Implementierung von flatMap ist so, dass es immer den gesamten Inhalt des Teilstroms nach unten schieben wird. Selbst wenn Ihr Stream nicht parallel wäre, könnten Sie mehr Elemente durch den Stream fließen sehen, als Sie benötigen, um findAny zu erfüllen.

+0

Es scheint, dass diese Antwort richtig ist, und dass 'flatMap(). Filter(). FindAny()' grundsätzlich nicht kurzgeschlossen ist. Ich weiß nicht, warum es so umgesetzt werden würde. –

+1

"short-cuircuiting" bedeutet lediglich, dass es * beendet * werden darf, bevor der gesamte Stream untersucht wird. Darüber hinausgehende Garantien gibt es nicht. – Misha

1

Es ist nicht so, dass es fortfährt, es ist, dass es bereits alle Arten von Threads geschickt hat, um zu versuchen, das Ergebnis zu finden und wird warten, bis diese abgeschlossen sind, bevor das Ergebnis zurückgegeben wird.

Mit anderen Worten, die findAny Terminal-Operation wird die "Suche" Aufgabe an eine Anzahl von Threads übergeben. Diese Aufgaben wenden einfach die filterPredicate an und kehren zurück, wenn etwas zurückkommt true. findAny wartet vermutlich darauf, dass einer von ihnen einen Wert zurückgibt. Es gibt keine Möglichkeit, etwas zu stornieren, was bereits eingereicht wurde, und es scheint, dass diese Implementierung blockiert wird, bis der gesamte Batch zurückkehrt. Es kann nur aufhören, zukünftige Chargen zu senden.

Sie können dies überprüfen, indem Sie den aktuellen Thread Anmeldung:

System.out.println(Thread.currentThread() + " " + Arrays.toString(a)); // For testing only 
+0

Ich bin halb im Schlaf, also ist das wahrscheinlich eine dumme Frage, aber wenn eine Last von Worker Threads Aufgaben im Voraus gegeben werden, und die ganze Methode nicht zurückkehren kann, bis sie alle fertig sind, was bedeutet Kurzschließen sogar? –

+1

@PaulBoddington Ich glaube nicht, dass es _all_ ist, ich denke, es ist eine Teilmenge. –

+1

@PaulBoddington Zum Beispiel feuere ich 5 Threads ab, um zu suchen. Alle 5 könnten ein Ergebnis liefern. Aber ich muss auf alle 5 warten, bevor ich mich entscheiden kann. (Nun, Sie müssen wirklich nur auf einen warten, aber Sie können die anderen nicht abbrechen. Und diese Implementierung scheint bei all diesen 5 Aufgaben mitmachen zu wollen.) –

2

Wie für "warum es auf diese Weise implementiert wurde". Das Problem liegt tief in der Stream-API-Implementierung. Die flatMap Stelle erstellt oft einen Stream mit einigen Zwischenoperationen (wie .flatMap(list -> list.stream().map(...).filter(...))). Man könnte innerhalb der flatMap Implementierung verwenden und tryAdvance viele Male anrufen, bis die Stornierung angefordert wird. Der spliterator() Aufruf gibt jedoch einen etwas künstlichen Spliterator zurück, wenn der Stream Zwischenoperationen enthält (wenn nicht, gibt er nur den ursprünglichen Stream-Spliterator zurück).Dieser künstliche Spliterator hat keine sehr effiziente tryAdvance() Implementierung, so dass die Verwendung dieser Implementierung als schlechterer Performance-Nachteil im Vergleich zum Verbrauch des gesamten flatMapped Streams angesehen werden kann. In vielen Fällen flachtMap auf einige kurze Streams ab, sodass Sie hier dank der aktuellen Implementierung möglicherweise einen Leistungszuwachs erzielen.

Verwandte Themen