4

In einer C# -Konsolenanwendung, die System.Reactive.Linq verwendet, versuche ich eine Observable zu erstellen, wobei jedes Element das String-Ergebnis einer Verarbeitung durch eine andere Observable ist. Ich habe eine einfache Repro mit Strings und Zeichen erstellt. Warnung, dieses Beispiel ist vollständig CONTRIVED, und der Punkt ist, dass die verschachtelte .Wait() hängt.Nested Observable hängt an Wait()

class Program 
{ 
    static void Main(string[] args) 
    { 
     string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" }; 
     IObservable<string> files = fileNames.ToObservable(); 
     string[] extensions = files.Select(fn => 
     { 
      var extension = fn.ToObservable() 
      .TakeLast(4) 
      .ToArray() 
      .Wait(); // <<<<<<<<<<<<< HANG HERE 
      return new string(extension); 
     }) 
     .ToArray() 
     .Wait(); 
    } 
} 

Wieder, das ist nicht, wie ich das Suffix vieler Dateinamen finden würde. Die Frage ist, wie ich ein Observable von Strings erzeugen kann, wobei die Strings aus einer abgeschlossenen Observablen berechnet werden.

Wenn ich diesen Code herausziehe und es alleine leite, ist es in Ordnung.

Es gibt etwas über die verschachtelten Wait() auf den async Methoden, die ich nicht verstehe.

Wie kann ich die verschachtelten asynchronen Observablen codieren, damit ich ein einfaches Array von Strings erstellen kann?

Dank

-Johannes

+2

Ihre Methoden sind ** nicht ** asynchron: Wait() ist eine blockierende Operation ... Siehe https://stackoverflow.com/questions/37801699/what-does-the-wait-operator-in-rx -net-do –

+0

Können Sie erklären, warum Sie 'Wait()' in erster Linie verwenden? –

Antwort

4

Der Grund, warum Ihr Code blockiert ist, weil Sie ToObservable() verwenden, ohne einen Scheduler angeben. In diesem Fall wird die CurrentThreadScheduler verwendet.

So die files beobachtbare Probleme, es ist zuerst OnNext() [A] (Senden "file1.doxc") mit dem aktuellen Thread. Es kann nicht weiter iterieren, bis das OnNext() zurückgibt. Allerdings ist die innere fn beobachtbaren auch verwendet ToObservable() und die Wait() blockiert, bis fn abgeschlossen ist - es wird die erste OnNext() Warteschlange (Senden "f") auf den aktuellen Thread-Scheduler, aber es wird nie, es zu versenden, weil jetzt die erste OnNext() [A ] wird nie zurückkehren.

Zwei einfache Korrekturen:

Entweder die files beobachtbar wie folgt ändern:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default); 

Oder vermeiden Sie die Verwendung des inneren Wait() mit einem SelectMany (was auf jeden Fall mehr idiomatische Rx):

string[] extensions = files.SelectMany(fn => 
{ 
    return fn.ToObservable() 
      .TakeLast(4) 
      .ToArray() 
      .Select(x => new string(x)); 
}) 
.ToArray() 
.Wait(); 

// display results etc. 

Jeder Ansatz wird eine andere Ausführungssemantik haben - der erste wird ähnlich wie a laufen Nested Loop, wobei jede innere Observable vor der nächsten äußeren Iteration abgeschlossen wird. Die zweite wird viel mehr verschachtelt sein, da das Blockierungsverhalten des Wait() beseitigt ist. Wenn Sie die Spy Methode verwenden, die ich schrieb und nach beiden ToObservable()-Aufrufe anbringe, werden Sie dieses Verhalten ziemlich deutlich sehen.

+2

Danke James, tolle Antwort, auf den Punkt und informativ. Ich hatte keinen guten Grund, SelectMany nicht zu verwenden, tatsächlich ist das meine ultimative Lösung. Ich habe versucht, den NewThreadScheduler.Default nur für einen Test hinzuzufügen, und das hat natürlich auch funktioniert. Ich bekomme das Konzept, dass die Observables auf dem OnNext deadlocked sind, aber ich denke, dass ich etwas mit Spy experimentieren werde und versuche, das für mich klarer zu machen. Danke noch einmal! – JohnKoz

1

Wait eine blockierende Aufruf, die mit Rx nicht gut mischen. Ich bin mir nicht sicher, warum die Verschachtelte fehlschlägt.

eine Asynchron-Funktion Unter der Annahme, dies funktioniert:

IObservable<string> files = fileNames.ToObservable(); 
string[] extensions = await files.SelectMany(async fn => 
{ 
    var extension = await fn.ToObservable() 
    .TakeLast(4) 
    .ToArray(); 
     return new string(extension); 
}) 
.ToArray(); 
1

James' hat die Frage genagelt, aber ich würde vorschlagen, dass der Code dies nur zu tun getan Furunkel:

string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" }; 
    string[] extensions = 
    (
     from fn in fileNames.ToObservable() 
     from extension in fn.ToObservable().TakeLast(4).ToArray() 
     select new string(extension) 
    ) 
     .ToArray() 
     .Wait(); 

Nun, das hat noch ein .Wait() drin. Im Idealfall würden Sie so etwas tun:

IDisposable subscription = 
    (
     from fn in fileNames.ToObservable() 
     from extension in fn.ToObservable().TakeLast(4).ToArray() 
     select new string(extension) 
    ) 
     .ToArray() 
     .Subscribe(extensions => 
     { 
      /* Do something with the `extensions` */ 
     }); 

Sie sollten alle Wartezeiten vermeiden.