Bearbeiten: Bitte beachten Sie den Kommentar von the_joric, wenn Sie dies verwenden werden. Es ist ein Grenzfall, der nicht behandelt wird, sehe ich nicht einen schnellen Weg, um es zu beheben, und so habe ich keine Zeit, es jetzt zu beheben.
Hier ist eine Lösung in C#, da Sie die system.reactive
Tag haben.
static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
var source = Observable.Merge(
a.Select(x => Tuple.Create('a', x)),
b.Select(y => Tuple.Create('b', y)));
return source.Publish(o =>
{
var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
return Observable.Merge(
published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
});
}
Die Idee ist wie folgt zusammengefasst.
Wenn a
den Wert emittiert x
wir es verzögern, bis b
Wert y
so dass x <= y
emittiert.
Wenn b
den Wert emittiert y
wir es verzögern, bis a
Wert x
so dass y <= x
emittiert.
Wenn Sie nur heiße Observable hatten, könnten Sie Folgendes tun. Aber das Folgende würde nicht funktionieren, wenn es kalte Observable in der Mischung gäbe. Ich würde empfehlen, immer die Version zu verwenden, die sowohl für heiße als auch kalte Observables funktioniert.
static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
return Observable.Merge(
a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}
Sehr interessante Frage, ich mag es. :) –