2017-12-19 3 views
1

Ich habe eine Situation, in der ich eine IObservable<T> von einer Methode zurückgeben muss. Es wird ein neuer Wert des Objekts T zurückgegeben, wann immer es sich geändert hat.Rx.net und der Schauspieler Model

Der Auslöser, dass das Objekt geändert wurde, kommt vom Akteursystem, in meinem Beispiel verwenden wir Proto.Actor.

Ich habe ein Beispiel zu „arbeiten“ bekommen, aber ich fühle mich nicht richtig für die richtige IObservable und wird nicht zulassen, Abmelde usw.

Meine Lösung implementiert:

public IObservable<T> Watch<T>(long id) 
    { 
     var subscriber = _actorFactory.GetActor<ObjectChangedActor>(); 
     var request = new ObjectWatchRequest(id); 

     return Observable.Create<T>(async (observer, cancellationToken) => 
     { 
      await Task.Run(async() => 
      { 
       while (true) 
       { 
        var response = await subscriber.RequestAsync<ObjectUpdatedResponse>(request, cancellationToken); 
        // once it gets past the above line, we have gotten notice that the object has been updated 
        var updatedObj = await Get<T>(id); 
        observer.OnNext(updatedObj); 
       } 
      }, cancellationToken); 

      // will this ever be called? 
      return Disposable.Create(() => 
      { 
       Console.WriteLine("Disposal called"); 
      }); 
     }); 
    } 

Ich mag das Gefühl, Trotz der Tatsache, dass es "funktioniert", dass alle Updates an den Abonnenten weitergegeben werden, wird es nie in der Lage sein, genau zu disponieren, und ich habe nicht das Gefühl, while (true) sollte jemals in Code geschrieben werden, auch wenn es in einem separaten Thread ist .

Wie würde ich am besten mit einer Liste von Nachrichten umgehen, die vom Actor-System zurückkommen und diese in eine IObservable verwandeln, über die der Benutzer verfügen kann (abbestellen)?

Antwort

1

Sie haben kein voll funktionsfähiges Beispiel Ihres Codes angegeben, was es schwierig gemacht hat, Ihnen eine funktionierende Lösung zu geben, die Sie einfach kopieren und einfügen können. Allerdings habe ich eine Cut-Down-Version gemacht, von der Sie wahrscheinlich arbeiten können.

Sie haben Recht, dass Ihre Lösung keine gute Implementierung ist.

Hier ist eine Basisversion des Codes, der in etwa richtig funktionieren sollte, wenn Sie Ihre Objekte in setzen:

public IObservable<T> Watch<T>(long id) 
{ 
    return 
     Observable 
      .Defer(() => 
       from x in Observable.FromAsync(() => RequestAsync()) 
       from y in Observable.FromAsync(() => Get<T>(id)) 
       select y) 
      .Repeat(); 
} 

Lassen Sie mich wissen, ob dies das Zeichen für dich trifft.

+0

Ja, tatsächlich trifft es mich. Ich hatte bereits '.Repeat()' in einer aktualisierten Lösung verwendet, aber ich benutzte 'Observable.Defer' nicht. Immer noch nicht 100% davon, wie sich das von "Observable.Create" unterscheidet, aber ich werde etwas weiterlesen. – DenverCoder9

Verwandte Themen