2016-05-24 5 views
2

ich folgenden Code haben:Wie mit der async-Methode in Rx abonnieren?

IObservable<Data> _source; 

... 

_source.Subscribe(StoreToDatabase); 

private async Task StoreToDatabase(Data data) { 
    await dbstuff(data); 
} 

Dies ist jedoch nicht kompiliert. Gibt es eine Möglichkeit, Daten asynchron zu beobachten? Ich habe versucht async void, es funktioniert, aber ich fühle, dass die gegebene Lösung nicht durchführbar ist.

Ich habe auch Reactive Extensions Subscribe calling await, aber es gibt keine Antwort auf meine Frage (ich über das SelectMany Ergebnis nicht.)

+0

Mögliches Duplikat von [Reactive Extensions Abonnieren Calling erwarten] (http://stackoverflow.com/questions/24843000/reactive-extensions-subscribe-calling-await) –

Antwort

3

Sie müssen über das SelectMany Ergebnis egal. Die Antwort ist immer noch die gleiche ... obwohl Sie Ihre Aufgabe benötigen, um einen Rückgabetyp zu haben (d. H. Task<T>, nicht Task).

Unit ist im wesentlichen äquivalent zu void, so können Sie das verwenden:

_source.SelectMany(StoreToDatabase).Subscribe(); 

private async Task<Unit> StoreToDatabase(Data data) 
{ 
    await dbstuff(data); 
    return Unit.Default; 
} 

Diese SelectMany Überlastung eine Func<TSource, Task<TResult> nimmt die resultierende Sequenz bedeutet, wird erst abgeschlossen, wenn die Aufgabe abgeschlossen ist.

+3

Aber zumindest einen 'OnError' Handler in Ihrem Abonnement Methode. Wenn Sie nicht, und entweder "_source" Fehler oder "StoreToDatabase" wirft Sie haben Ihre Anwendung umfallen, oder landen in einem unbekannten Zustand. –

0

Ich habe TPL DataFlow verwendet, um den Gegendruck zu steuern, und habe es verwendet, um dieses Problem zu lösen.

Der Schlüssel ist ITargetBlock<TInput>.AsObserver() - source.

// Set a block to handle each element 
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p => 
{ 
    Console.WriteLine($"Received {p}"); 
    await Task.Delay(1000); 
    Console.WriteLine($"Finished handling {p}"); 
}, 
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 

// Generate an item each second for 10 seconds 
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10); 

// Subscribe with an observer created from the target block. 
sequence.Subscribe(targetBlock.AsObserver()); 

// Await completion of the block 
await targetBlock.Completion; 

Der wichtige Teil ist, dass der beschränkte Kapazität des ActionBlock auf 1 gesetzt ist dies den Block aus, die mehr als ein Element zu einem Zeitpunkt und will block OnNext, wenn ein Artikel bereits bearbeitet wird verhindert!

Meine große Überraschung hier war, dass es sicher sein kann, Task.Wait und Task.Result in Ihrem Abonnement zu nennen. Offensichtlich, wenn Sie ObserverOnDispatcher() oder ähnliches aufgerufen haben, werden Sie wahrscheinlich Deadlocks treffen. Achtung!