Nehmen wir an, ich habe eine Datenservice-Klasse, die die Daten Batch für Batch und ihre Chunks an die Abonnenten abruft.Wie vermeidet man, onComplete aufzurufen, bevor die nächste fertig ist?
public class DataService {
public IObservable<IList<T>> QuerySegmentedObservable<T>(string tableName) where T : TableEntity, new(){
return Observable.Create<IList<T>>(async (observer, token) =>{
TableContinuationToken continuationToken = null;
do{
var currentSegment = CallData();
observer.OnNext(currentSegment.Results);
continuationToken = currentSegment.ContinuationToken;
} while (continuationToken != null);
observer.OnCompleted();
}
}
}
Ich abonniere dieses Observable wie unten.
public async Task<bool> MyMethod()
{
var tcs = new TaskCompletionSource<bool>();
var observable = _dataService.QuerySegmentedObservable<TSource>(_sourceTableName);
var dataCount = 0;
_databaseService.OpenConnection();
observable.Subscribe(async data =>
{
await _databaseService.DoSomething(data);
dataCount += data.Count;
Console.WriteLine($"Processing - {dataCount}");
},
err =>
{
Console.WriteLine($"Error - {err.Message}");
tcs.SetResult(false);
},
() =>
{
_databaseService.CloseConnection();
Console.WriteLine($"Finished");
tcs.SetResult(true);
}
);
return await tcs.Task;
}
Das Problem ist, dass OnComplete() vor dem letzten OnNext() aufgerufen wird beendet. Also habe ich die Verbindung beendet, bevor ich die Aufgabe, die ich in Subscribe() mache, beende;
Gibt es eine Möglichkeit, es zu beheben? Vielen Dank.
Versuchen Sie, asynchrone Operationen aus dem 'onNext'-Block zu entfernen. – redent84
Was ist der Typ von '_databaseService'? – Enigmativity
@En Es ist nur eine Klasse, die die Daten in die Datenbank einfügt oder etwas tut, das ein paar Sekunden bis zu 40 Sekunden dauern kann. –