2017-03-07 2 views
1

Nehmen wir an, ich habe eine Datenservice-Klasse, die die Daten Batch für Batch und ihre Chunks an die Abonnenten abruft.Wie vermeidet man, onComplete aufzurufen, bevor die nächste fertig ist?

public class DataService { 
    public IObservable<IList<T>> QuerySegmentedObservable<T>(string tableName) where T : TableEntity, new(){ 

     return Observable.Create<IList<T>>(async (observer, token) =>{ 
      TableContinuationToken continuationToken = null; 
      do{ 
       var currentSegment = CallData(); 
       observer.OnNext(currentSegment.Results); 

       continuationToken = currentSegment.ContinuationToken; 
      } while (continuationToken != null); 
      observer.OnCompleted(); 
     } 
    } 
} 

Ich abonniere dieses Observable wie unten.

public async Task<bool> MyMethod() 
     { 
      var tcs = new TaskCompletionSource<bool>(); 
      var observable = _dataService.QuerySegmentedObservable<TSource>(_sourceTableName); 

      var dataCount = 0; 

      _databaseService.OpenConnection(); 

      observable.Subscribe(async data => 
      {     
       await _databaseService.DoSomething(data); 

       dataCount += data.Count; 

       Console.WriteLine($"Processing - {dataCount}"); 
      }, 
      err => 
      { 
       Console.WriteLine($"Error - {err.Message}"); 
       tcs.SetResult(false); 
      }, 
      () => 
      {  
       _databaseService.CloseConnection(); 
       Console.WriteLine($"Finished"); 
       tcs.SetResult(true); 
      } 
      ); 

      return await tcs.Task; 
     } 

Das Problem ist, dass OnComplete() vor dem letzten OnNext() aufgerufen wird beendet. Also habe ich die Verbindung beendet, bevor ich die Aufgabe, die ich in Subscribe() mache, beende;

Gibt es eine Möglichkeit, es zu beheben? Vielen Dank.

+3

Versuchen Sie, asynchrone Operationen aus dem 'onNext'-Block zu entfernen. – redent84

+0

Was ist der Typ von '_databaseService'? – Enigmativity

+0

@En Es ist nur eine Klasse, die die Daten in die Datenbank einfügt oder etwas tut, das ein paar Sekunden bis zu 40 Sekunden dauern kann. –

Antwort

0

Rx unterstützt async/await innerhalb von Operatoren. Sie verwenden es jedoch innerhalb eines Abonnements. So (hoffentlich) Sie können Ihren Code so etwas wie dies ändern:

public async Task<bool> MyMethod() 
{ 
    var tcs = new TaskCompletionSource<bool>(); 
    _databaseService.OpenConnection(); 
    var dataCount = 0; 
    _dataService.QuerySegmentedObservable<TSource>(_sourceTableName) 
     .SelectMany(async data => 
     { 
      await _databaseService.DoSomething(data); 
      return data; 
     }) 
     //.Finally(() => _databaseService.CloseConnection()) //This would be called on OnComplete and OnError, just like try-finally 
     .Subscribe(data => 
      { 
       dataCount += data.Count; 

       Console.WriteLine($"Processing - {dataCount}"); 
      }, 
      err => 
      { 
       Console.WriteLine($"Error - {err.Message}"); 
       tcs.SetResult(false); 
      }, 
      () => 
      { 
       _databaseService.CloseConnection(); //Maybe move this to a Finally call? 
       Console.WriteLine($"Finished"); 
       tcs.SetResult(true); 
      } 
     ); 

    return await tcs.Task; 
} 

ich nicht wirklich testen kann, so hoffe ich, dass Sets Sie auf dem richtigen Weg. Wenn Sie mehr Hilfe benötigen, dann schreiben Sie bitte eine bessere MCVE.

Verwandte Themen