Ich habe TPL DataFlow verwendet, um den Gegendruck zu steuern, und habe es verwendet, um dieses Problem zu lösen.
Der Schlüssel ist ITargetBlock<TInput>.AsObserver()
- source.
// Set a block to handle each element
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p =>
{
Console.WriteLine($"Received {p}");
await Task.Delay(1000);
Console.WriteLine($"Finished handling {p}");
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Generate an item each second for 10 seconds
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
// Subscribe with an observer created from the target block.
sequence.Subscribe(targetBlock.AsObserver());
// Await completion of the block
await targetBlock.Completion;
Der wichtige Teil ist, dass der beschränkte Kapazität des ActionBlock auf 1 gesetzt ist dies den Block aus, die mehr als ein Element zu einem Zeitpunkt und will block OnNext, wenn ein Artikel bereits bearbeitet wird verhindert!
Meine große Überraschung hier war, dass es sicher sein kann, Task.Wait
und Task.Result
in Ihrem Abonnement zu nennen. Offensichtlich, wenn Sie ObserverOnDispatcher()
oder ähnliches aufgerufen haben, werden Sie wahrscheinlich Deadlocks treffen. Achtung!
Mögliches Duplikat von [Reactive Extensions Abonnieren Calling erwarten] (http://stackoverflow.com/questions/24843000/reactive-extensions-subscribe-calling-await) –