2016-08-09 11 views
0

Ich habe einen Input-Stream von einem RS-232-Port und eine Warteschlange von Befehlen an den seriellen Port mit Rx-Streams. Ich habe meinen Code vereinfacht wie folgt:Anfrage/Antwort mit Streams

void Init() 
{ 
    SerialPort srl; 

    ... // open serial port 

    IObservable<string> obInput = 
     Observable.FromEventPattern< 
     SerialDataReceivedEventHandler, 
     SerialDataReceivedEventArgs> 
     (
      handler => srl.DataReceived += handler, 
      handler => srl.DataReceived -= handler 
     ).SelectMany(_ => 
     { 
     List<string> ret; 
     ... //extract messages 
     return ret; 
     }).Publish().Refcount(); 

    obCommandOk = 
     obInput 
     .Where(msg => msg == "OK" || msg == "KO"); 

    var sAction = new Subject<string>(); 
    var sCommandOk = new Subject<Tuple<string,bool>>(); 

    sAction 
     .Do(srl.WriteLine) 
     .Zip(obCommandOk, (cmd, result) => 
     { 
     if (result == "OK") 
      sCommandOk.OnNext(Tuple.Create(cmd, true)) 
     else 
      sCommandOk.OnNext(Tuple.Create(cmd, false)) 
     }); 
} 

async bool Command(string cmd) 
{ 
    sAction.OnNext(cmd); 

    return 
     await sCommandOk 
     .Where(t => t.Item1 == cmd) 
     .Select(t => t.Item2) 
     .FirstAsync(); 
} 

kommt vor, dass nach OnNext das Ergebnis bereits nach sCommandOk gedrückt wurde und so habe ich es zu verlieren.

Können Sie mir einen besseren Ansatz vorschlagen, um den Verlust von Antworten zu vermeiden?

+1

Können wir bieten wollen http://stackoverflow.com/hilfe/mcve bitte? Sicher, ich bekomme es ist Pseudo-Code, aber es ist etwa 5 Änderungen weg von echten C#, die kompiliert. –

+0

@LeeCampbell es ist ziemlich schwierig zu extrapolieren, was Sie von meinem Code anfordern, aber ich habe etwas geschrieben, das hier kompiliert https://gist.github.com/zpul/b7a59e559e523a8a3b1077b1ec8013ef (zu groß für SO) – zpul

+0

Sie sollten wirklich nicht verwenden ein externer veränderbarer Puffer von 'SelectMany', das ist wirklich eine wirklich schlechte Idee. Sie können absolut nichts über den Inhalt garantieren. – TheInnerLight

Antwort

0

Sie haben eine Wettlaufbedingung in Ihrer Command Methode. Sie drücken die Aktion, dann abonnieren Sie das Ergebnis. Wenn das Ergebnis schnell ist, dann verlieren Sie es.

eine kleine Änderung hier kann man erkennen, mildern:

async Task<bool> Command(string cmd) 
{ 
    var result = sCommandOk 
     .Where(t => t.Item1 == cmd) 
     .Select(t => t.Item2) 
     .FirstAsync(); 

    sAction.OnNext(cmd); 

    return await result; 
} 

Ich glaube, Sie weiter durch das Entfernen des sCommandOk Thema optimieren können alle zusammen

async Task<bool> Command(string cmd) 
{ 
    var result = obInput 
     .Where(t => t.Item1 == cmd) 
     .Select(t => t.Item2) 
     .FirstAsync(); 

    sAction.OnNext(cmd); 

    return await result; 
} 
+0

Leider habe ich diesen Ansatz bereits versucht und das Problem besteht weiter. Die einzige Lösung, die ich zu diesem Zeitpunkt gefunden habe, ist, zusammen mit dem Befehl eine Ergebnisvariable zu übergeben, die, wenn ich nach dem OnNext gesetzt wurde, bereits das Ergebnis habe, ansonsten warte ich. Es ist sehr schlecht, also mag ich es nicht. – zpul