2016-10-24 4 views
1

Also lerne ich RxPy nachdem ich RxJava und RxKotlin seit zwei Jahren gemacht habe. Eine Sache, die ich bemerke, ist, dass bestimmte Operatoren verrücktes Interleaving verursachen, das in RxJava nicht passiert.RxPy - Warum verschmelzen Emissionen mit verschmelzenden Operatoren?

Zum Beispiel wird flat_map() Emissionen zu verschachteln außer Reihenfolge für eine einfache Observable Quelle verursachen.

items = Observable.from_(("Alpha","Beta","Gamma","Delta","Epsilon")) 

items.flat_map(lambda s: Observable.from_(list(s))).subscribe(print) 

OUTPUT:

A 
l 
B 
p 
e 
G 
h 
t 
a 
D 
a 
a 
m 
e 
E 
m 
l 
p 
a 
t 
s 
a 
i 
l 
o 
n 

jedoch mit RxJava oder RxKotlin, bleibt alles sequentielle und in Ordnung.

fun main(args: Array<String>) { 
    Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") 
     .flatMap { 
      Observable.from(it.toCharArray().asIterable()) 
     }.subscribe(::println) 
} 

OUTPUT:

A 
l 
p 
h 
a 
B 
e 
t 
a 
G 
a 
m 
m 
a 
D 
e 
l 
t 
a 
E 
p 
s 
i 
l 
o 
n 

ich bestätigte, dass alles auf den MainThread läuft und es gibt kein seltsames asynchrones Scheduling geht (glaube ich).

Warum verhält sich RxPy so? Ich merke, dass dies fast mit jedem Operator passiert, der mehrere Quellen zusammenführt. Was genau macht der Standardplaner?

Warum gibt es auch keinen concat_map() in RxPy? Ich habe den Eindruck, dass dies irgendwie nicht möglich ist, wie die Planung funktioniert ...

+2

Die Tatsache erhalten, dass 'flatMap' um in RxJava zu respektieren geschieht, ist eine Implementierung Detail, und Sie sollten nicht sich darauf verlassen. Verwenden Sie 'concatMap', wenn Sie eine Bestellung aufbewahren möchten. –

+0

Nun, ich denke, das stimmt. Ich denke, das führt zu meiner Frage, warum 'concat_map() 'nicht in RxPy implementiert ist? Aktualisiert die Frage, um dies zu berücksichtigen ... – tmn

Antwort

3

Wie bereits erwähnt wurde, flatMap keine Garantie für die Reihenfolge. RxPy nicht concat_map als eigenständige Betreiber implementieren, aber Sie können den gleichen Effekt mit den map und concat_all Betreiber

Observable.from_(("Alpha","Beta","Gamma","Delta","Epsilon"))\ 
      .map(lambda s: Observable.from_(list(s)))\ 
      .concat_all()\ 
      .subscribe(print)