2015-06-13 12 views
7

den folgenden Code:groupBy Operator, Elemente aus verschiedenen Gruppen verschachtelt

Observable 
      .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) 
      .doOnNext(item -> System.out.println("source emitting " + item)) 
      .groupBy(item -> { 
       System.out.println("groupBy called for " + item); 
       return item % 3; 
      }) 
      .subscribe(observable -> { 
       System.out.println("got observable " + observable + " for key " + observable.getKey()); 
       observable.subscribe(item -> { 
        System.out.println("key " + observable.getKey() + ", item " + item); 
       }); 
      }); 

lässt mich verwirrt. Der Ausgang ich erhalte, ist:

source emitting 0 
    groupBy called for 0 
    got observable [email protected] for key 0 
    key 0, item 0 
    source emitting 1 
    groupBy called for 1 
    got observable [email protected] for key 1 
    key 1, item 1 
    source emitting 2 
    groupBy called for 2 
    got observable [email protected] for key 2 
    key 2, item 2 
    source emitting 3 
    groupBy called for 3 
    key 0, item 3 
    source emitting 4 
    groupBy called for 4 
    key 1, item 4 
    source emitting 5 
    groupBy called for 5 
    key 2, item 5 
    source emitting 6 
    groupBy called for 6 
    key 0, item 6 
    source emitting 7 
    groupBy called for 7 
    key 1, item 7 
    source emitting 8 
    groupBy called for 8 
    key 2, item 8 
    source emitting 9 
    groupBy called for 9 
    key 0, item 9 

also in der obersten Ebene abonnieren Methode, bekomme ich 3 Observablen aus dem GroupedObservable, wie erwartet. Dann, eins nach dem anderen, abonniere ich die gruppierten Observablen - und hier das, was ich nicht verstehe:

Warum werden die ursprünglichen Elemente immer noch in der ursprünglichen Reihenfolge (dh 0, 1, 2, 3, .. .) und nicht 0, 3, 6, 9 ... für Schlüssel 0, gefolgt von 1, 4, 7 für Schlüssel 1, gefolgt von 2, 5, 8 für Schlüssel 2?

Ich glaube, ich verstehe, wie die Gruppen erstellt:

1. 0 is emitted, the key function is called and it gets 0 
2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0 
3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly 
4. source item 3 is emitted, the key function is called and it gets 0 
5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable 
6. etc. until the source sequence is drained 

Es scheint, dass, obwohl ich die gruppierten Observablen eins nach dem anderen bekommen, sind ihre Emissionen irgendwie verschachtelt. Wie kommt es dazu?

Antwort

4

Warum werden die ursprünglichen Elemente immer noch in der ursprünglichen Reihenfolge ausgegeben (z. B. 0, 1, 2, 3, ...) und nicht 0, 3, 6, 9 ... für Taste 0, gefolgt von 1 , 4, 7 für Taste 1, gefolgt von 2, 5, 8 für Taste 2?

Sie haben Ihre eigene Frage beantwortet. Sie arbeiten mit einem Strom von Elementen in der Reihenfolge, in der sie ausgegeben werden. Wenn also jeder ausgegeben wird, wird er über die Operatorkette geleitet und Sie sehen die Ausgabe, die Sie hier gezeigt haben.

Die alternative Ausgabe, die Sie dort erwarten, erfordert, dass die Kette wartet, bis die Quelle die Ausgabe von Elementen für Gruppen beendet hat. Angenommen, Sie hatten Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0). Dann würden Sie (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) als Ihre Ausgabegruppen erwarten. Was wäre, wenn Sie einen unendlichen Strom von Vierern hätten? Dein Abonnent würde niemals 0, 3 .. von der ersten Gruppe erhalten.

Sie können das von Ihnen gesuchte Verhalten erstellen. Der toList Operator Ausgabe zwischengespeichert werden, bis die Quelle abgeschlossen ist, und dann ein List<R> an die Teilnehmer übergeben:

.subscribe(observable -> { 
    System.out.println("got observable " + observable + " for key " + observable.getKey()); 
    observable.toList().subscribe(items -> { 
     // items is a List<Integer> 
     System.out.println("key " + observable.getKey() + ", items " + items); 
    }); 
}); 
+0

ich über den ToList wissen() betrüge. Ich weiß auch, dass dieses Verhalten Sinn macht, und Ihr Beispiel zeigt sehr gut, warum. Ich bin ziemlich ratlos darüber, wie es so umgesetzt wird, wie es funktioniert. Ich bekomme 3 Observables, und ich abonniere sie, wie sie von der Gruppierung ausgesendet werden - aber wie kann es verschachtelt werden? Es ist so, als würden die 3 Abonnements sofort passieren und die Emissionen wären später irgendwie asynchron. Könnten Sie etwas Licht dazu bringen? – wujek

+1

Die 3 Subskriptionen für die verschiedenen Gruppen werden ausgeführt, wenn eine neue Gruppe auf Anforderung erstellt wird. Die Emissionen laufen tatsächlich _synchron_ - da dies alles auf dem gleichen Thread ist, muss jeder fertig bearbeitet werden, bevor der nächste behandelt werden kann. Ich denke, das ist wahrscheinlich der Schlüssel, den du vermisst? –

+0

"alle im selben Thread, jeder muss fertig bearbeitet werden, bevor der nächste bearbeitet werden kann" - genau das bekomme ich nicht. Die Ausgabe zeigt, dass alle zur selben Zeit bearbeitet werden; Es ist nicht so, dass die erste Gruppe zuerst konsumiert wird und dann kommt die nächste ins Bild - ihre Emissionen sind miteinander verzahnt. – wujek

Verwandte Themen