2017-05-03 4 views
-1

Ich versuche, dieses Programm zu Generika zu konvertieren. Doch auf der LinieGenerics und Rx

foreach (var observer in observerList) 
{       
    observer.OnNext(observer); 
} 

ich:

Cannot convert from System.IObserver<T> to T 

Programm:

public sealed class EnumerableObservable<T> : IObservable<T>, IDisposable 
{ 
    private readonly IEnumerable<T> enumerable; 

    public EnumerableObservable(IEnumerable<T> enumerable) 
    { 
     this.enumerable = enumerable; 
     this.cancellationSource = new CancellationTokenSource(); 
     this.cancellationToken = cancellationSource.Token; 

     this.workerTask = Task.Factory.StartNew(() => 
     { 
       foreach (var value in this.enumerable) 
       { 
        //if task cancellation triggers, raise the proper exception 
        //to stop task execution 

        cancellationToken.ThrowIfCancellationRequested(); 

        foreach (var observer in observerList) 
        {       
         observer.OnNext(observer); 
        } 
       } 
      }, this.cancellationToken); 
    } 

    //the cancellation token source for starting stopping 
    //inner observable working thread 
    private readonly CancellationTokenSource cancellationSource; 

    //the cancellation flag 
    private readonly CancellationToken cancellationToken; 

    //the running task that runs the inner running thread 
    private readonly Task workerTask; 

    //the observer list 
    private readonly List<IObserver<T>> observerList = new List<IObserver<T>>(); 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     observerList.Add(observer); 
     //subscription lifecycle missing 
     //for readability purpose 

     return null; 
    } 

    public void Dispose() 
    { 
     //trigger task cancellation 
     //and wait for acknoledge 

     if (!cancellationSource.IsCancellationRequested) 
     { 
      cancellationSource.Cancel(); 
      while (!workerTask.IsCanceled) 
       Thread.Sleep(100); 
     } 

     cancellationSource.Dispose(); 
     workerTask.Dispose(); 

     foreach (var observer in observerList) 
      observer.OnCompleted(); 
    } 
} 

public sealed class ConsoleStringObserver<T> : IObserver<T> 
{ 
    public void OnCompleted() 
    { 
     Console.WriteLine("-> END"); 
    } 

    public void OnError(Exception error) 
    { 
     Console.WriteLine("-> {0}", error.Message); 
    } 

    public void OnNext(T value) 
    { 
     Console.WriteLine("-> {0}", value.ToString()); 
    } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     //we create a variable containing the enumerable 
     //this does not trigger item retrieval 
     //so the enumerator does not begin flowing datas 

     var enumerable = EnumerateValuesFromSomewhere(); 
     using (var observable = new EnumerableObservable<string>(enumerable)) 
     using (var observer = observable.Subscribe(new ConsoleStringObserver<string>())) 
     { 
      //wait for 2 seconds than exit 
      Thread.Sleep(2000); 
     } 

     Console.WriteLine("Press RETURN to EXIT"); 
     Console.ReadLine(); 
    } 

    static IEnumerable<string> EnumerateValuesFromSomewhere() 
    { 
     var random = new Random(DateTime.Now.GetHashCode()); 

     while (true) //forever 
     { 
      //returns a random integer number as string 
      yield return random.Next().ToString(); 

      //some throttling time 
      Thread.Sleep(100); 
     } 
    } 
} 

Antwort

1

Sollte sagen observer.OnNext(value);

+0

LOL. Typo. Danke. :-) – Ivan

0

Sie haben ein Feld namens observerList. Dies ist eine Liste von IObserver.

Sie übergeben alle Elemente auf dieser Schleife und versuchen, mit Parameter selbst übergeben.

foreach (var observer in observerList) 
{       
    observer.OnNext(observer); 
} 

observer.OnNext erwartet T und sich nicht.

Wenn zum Beispiel ist IObserver<string> müssen Sie

observer.OnNext("Hello world"); 

ich Ihr Programm kann nicht verstehen, um eine Zeichenfolge zu übergeben, aber wenn Sie mir zeigen kann Ihnen helfen.