2015-06-03 11 views

Antwort

4

Bearbeiten: Bitte beachten Sie den Kommentar von the_joric, wenn Sie dies verwenden werden. Es ist ein Grenzfall, der nicht behandelt wird, sehe ich nicht einen schnellen Weg, um es zu beheben, und so habe ich keine Zeit, es jetzt zu beheben.

Hier ist eine Lösung in C#, da Sie die system.reactive Tag haben.

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b) 
{ 
    var source = Observable.Merge(
     a.Select(x => Tuple.Create('a', x)), 
     b.Select(y => Tuple.Create('b', y))); 
    return source.Publish(o => 
    { 
     var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2); 
     var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2); 
     return Observable.Merge(
      published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)), 
      published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x))); 
    }); 
} 

Die Idee ist wie folgt zusammengefasst.

  • Wenn a den Wert emittiert x wir es verzögern, bis b Wert y so dass x <= y emittiert.

  • Wenn b den Wert emittiert y wir es verzögern, bis a Wert x so dass y <= x emittiert.

Wenn Sie nur heiße Observable hatten, könnten Sie Folgendes tun. Aber das Folgende würde nicht funktionieren, wenn es kalte Observable in der Mischung gäbe. Ich würde empfehlen, immer die Version zu verwenden, die sowohl für heiße als auch kalte Observables funktioniert.

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b) 
{ 
    return Observable.Merge(
     a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)), 
     b.Delay(y => a.FirstOrDefaultAsync(x => y <= x))); 
} 
+1

Es scheint nicht zu funktionieren, wenn eines der Observables mit 'Observable.Create' erstellt wird. Ich habe ein Beispiel auf den Punkt gebracht: https://gist.github.com/skalinets/89f21662a619f685bd6a –

+1

@the_joric Ja, es sieht so aus, als ob es nicht funktioniert, wenn eine beobachtbare beendet wird, bevor die andere sogar abonniert ist. Ich habe jetzt keine Zeit, das jetzt zu beheben, also werde ich nur auf deinen Kommentar hinweisen. –

+0

Ich wusste nichts von dieser "Verzögerung" Überladung, danke! – ionoy

3

Sie können, sortieren verschmelzen und die Sequenzen abflachen, aber es wird einen erheblichen Aufwand haben:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

oder

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

Andernfalls müssen Sie eine ziemlich komplizierte Betreiber schreiben .

bearbeiten 2015.04.06:

Here is ein Operator, der diese nicht effizienter sortiert-fusionieren.

1

Der vor einiger Zeit auf den RxJava mailing list diskutiert wurde, werden Sie einige Links zu möglichen Lösungen in diesem Thread finden.

3

suchte ich auch für eine Art Lösung verschmelzen, die sie nicht finden einen Gegendruck auf und konnte unterstützt. Also entschied ich mich, es basierend auf dem vorhandenen Zip-Operator auf eigene Faust zu implementieren.

Ähnlich zip, sammelt die sortierte merge Operator ein Element aus jeder Quelle beobachtbaren ersten, aber dann stellt sie in eine Prioritätswarteschlange, aus der sie nacheinander entsprechend ihrer natürlichen Reihenfolge oder den angegebenen Komparator emittiert.

Sie können es von GitHub als bereit greifen Bibliothek verwenden oder einfach kopieren/fügen Sie den Code:

https://github.com/ybayk/rxjava-recipes

Siehe Unit-Tests für den Einsatz.

+0

In Ihrer Readme-Datei heißt es "Sie können sehr große oder unendlich sortierte Sequenzen zusammenführen". Ich denke, das ist der Hauptvorteil gegenüber den anderen Lösungen. Sie sollten das wahrscheinlich hervorheben. – Luciano

0

Was ist nur zusammenführen und sortieren?

@Test 
public void testMergeChains() { 
    Observable.merge(Observable.from(Arrays.asList(1, 2, 13, 11, 5)), Observable.from(Arrays.asList(10, 4, 12, 3, 14, 15))) 
       .collect(ArrayList<Integer>::new, ArrayList::add) 
      .doOnNext(Collections::sort) 
      .subscribe(System.out::println); 

} 

können Sie weitere Beispiele hier sehen

https://github.com/politrons/reactive

+0

Diese Lösung funktioniert, aber es ist leider im Falle einer großen Menge von Daten ineffizient: Es erfordert, dass alle Streams aufgrund der sort() konsumiert und gespeichert werden. – Christophe