2017-01-23 2 views
1

Ich mache eine Anfrage und warte auf eine Antwort auf diese Anfrage in einem Antwort-Stream. Während des Wartens kann es Updates im Updatestream geben.Reaktive Erweiterungen: Puffer bis zum Abschluss eines anderen Datenstroms

Ich möchte diese Updates haben, wenn der Antwort-Stream abgeschlossen ist. Wie folgt:

Response (cold) |  x y z | 
Updates (hot)  | 1 2 3  4 | 

Result   |  x y z 123 4 | 

Ich kann nicht scheinen, es richtig zu machen. Gibt es eine clevere Mischung von Betreibern, die mir geben können, was ich brauche?

+0

Nur laut denken. Es sieht tatsächlich ein bisschen so aus: http://stackoverflow.com/questions/31362031/with-reactive-extensions-rx-is-it-possible-to-add-a-pause-command, aber mit einem Check für stream complete statt boolean. Und einmal pausiert, kann es nie wieder pausiert werden. So etwas wie hot.StartWithAndBufferUntilComplete (kalt). – asgerhallas

+0

Könnte es mehrere 'Response's geben? – Shlomo

+0

@Shlomo Es könnte mehrere Ereignisse geben (wie x y z oben) auf der Antwort-Stream, aber sobald es abgeschlossen ist, gibt es keine Antworten mehr, wenn das ist, was du meinst? – asgerhallas

Antwort

2

aus dem Diagramm Frage :

Puffern Sie alle Aktualisierungen, bis die Antwort abgeschlossen ist, und zeigen Sie dann den Rest der Aktualisierungen so an, wie er angezeigt wird. Antworten erscheinen sofort.

updates.TakeUntil(response.LastAsync()) 
     .ToArray() 
     .SelectMany(x => x) 
     .Concat(updates) 
     .Merge(response); 
+0

Entschuldigung für die verspätete Antwort! Das scheint genau so zu funktionieren, wie ich es brauche, und nach ein wenig Nachdenken verstehe ich es auch :) Danke! – asgerhallas

1

Dies funktioniert, obwohl die Tatsache, dass Sie es nicht als mich stört Einzeiler tun können:

ist
var updatesReplayed = updates.Replay(); 
updatesReplayed.Connect(); 

var results = response.Concat(updatesReplayed); 

Tests hier die Funktionalität demonstriert:

// time    | 1 2 3 4 5 6 7 8 9 
// Response(cold)  |  x y z | 
// Updates(hot)  | 1 2 3  4 | 
// Result    |  x y z 123 4 | 

var scheduler = new TestScheduler(); 
    var updates = scheduler.CreateHotObservable(
     ReactiveTest.OnNext(100.ToMsTicks(), "1"), 
     ReactiveTest.OnNext(200.ToMsTicks(), "2"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "3"), 
     ReactiveTest.OnNext(800.ToMsTicks(), "4"), 
     ReactiveTest.OnCompleted<string>(900.ToMsTicks()) 
    ); 

    var response = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(300.ToMsTicks(), "x"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "y"), 
     ReactiveTest.OnNext(500.ToMsTicks(), "z"), 
     ReactiveTest.OnCompleted<string>(600.ToMsTicks()) 
    ); 

    var expectedResults = scheduler.CreateHotObservable(
     ReactiveTest.OnNext(300.ToMsTicks(), "x"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "y"), 
     ReactiveTest.OnNext(500.ToMsTicks(), "z"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "1"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "2"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "3"), 
     ReactiveTest.OnNext(800.ToMsTicks(), "4"), 
     ReactiveTest.OnCompleted<string>(900.ToMsTicks()) 
    ); 

    var replayed = updates.Replay(); 
    replayed.Connect(); 

    var results = response.Concat(replayed); 
    var observer = scheduler.CreateObserver<string>(); 
    results.Subscribe(observer); 

    scheduler.Start(); 
    ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages); 
+0

Das ist großartig, danke! Ich habe Replay() versucht, aber nur in Kombination mit StartWith, was im Nachhinein keinen Sinn ergibt. Das macht viel mehr Sinn :) – asgerhallas

+0

Eine Frage aber ... hört der replizierte Stream auf, neue Nachrichten zu puffern, wenn er abonniert ist? Ich weiß, dass dies kein Teil der ursprünglichen Frage ist, aber es ist sehr wichtig, da die Abfrage sehr lange läuft. – asgerhallas

+0

Ja. Die Testergebnisse zeigen, dass die "4" sofort herausspringt. – Shlomo

Verwandte Themen