2013-06-11 16 views
6

Ich versuche, die Reactive Extensions (Rx) zu verwenden, um eine Aufzählung von Aufgaben als Puffer zu puffern. Weiß jemand, ob es eine saubere integrierte Methode gibt, dies zu tun? Die ToObservable Erweiterungsmethode wird nur eine IObservable<Task<T>> machen, was ich nicht will, ich möchte eine IObservable<T>, die ich dann Buffer verwenden kann.Konvertieren IEnumerable <Task<T>> zu IObservable <T>

konstruiertes Beispiel:

//Method designed to be awaitable 
public static Task<int> makeInt() 
{ 
    return Task.Run(() => 5); 
} 

//In practice, however, I don't want to await each individual task 
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{ 
    //Make a bunch of tasks 
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt()); 

    //Is there a built in way to turn this into an Observable that I can then buffer? 
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //???? 

    buffered.Subscribe(ints => { 
     Console.WriteLine(ints.Count()); //Should be 15 
    }); 
} 
+0

http://stackoverflow.com/questions/13500456/how-to-convert-an-ienumerabletaskt-to-iobservablet –

Antwort

7

Sie können die Tatsache nutzen, dass Task kann auf beobachtbare mit another overload of ToObservable() umgewandelt werden.

Wenn Sie eine Sammlung von (einzelnen) Observablen haben, können Sie eine einzelne Observable erstellen, die die Elemente enthält, wenn sie mit Merge() abgeschlossen werden.

So könnte Ihr Code wie folgt aussehen:

futureInts.Select(t => t.ToObservable()) 
      .Merge() 
      .Buffer(15) 
      .Subscribe(ints => Console.WriteLine(ints.Count)); 
Verwandte Themen