2016-10-04 3 views
0

Ich habe ein Gerät, das über den seriellen Anschluss mit dem Computer kommuniziert. Nach dem Senden eines "START" -Befehls antwortet das Gerät mit einer Bestätigung und beginnt mit der Überwachung einiger externer Aktivitäten. Es überträgt dann asynchron einige Nachrichten auf der seriellen Schnittstelle abhängig von dieser externen Aktivität. Wenn das Gerät einen "STOP" -Befehl empfängt, antwortet es mit einer Bestätigung und hört dann auf, weitere Nachrichten zu senden (die die externe Aktivität darstellen).Rx.Net: Führen Sie einen asynchronen Nebeneffekt aus, wenn das Abonnement entsorgt wird

Ich habe die Start/Stop-Befehle mit kalten Observablen implementiert, die Nebeneffekte (Send-Befehl an der seriellen Schnittstelle) ausführen und eine einzelne Unit.Default ausgeben, wenn ein Empfang am SerialPort empfangen wird. Ich möchte ein IObservable konstruieren, das Nachrichten ausgibt, die der externen Aktivität entsprechen, und den Nebeneffekt "START" ausführen, wenn es abonniert ist, und den Nebeneffekt "STOP", wenn das Abonnement entsorgt wird. "START" ist einfach, ich muss nur ein `SelectMany 'machen, aber ich weiß nicht, wie man" STOP "durchführt.

class MonitoringDevice 
{ 
    private SerialPort _sp; 
    private IObservable<byte> _receivedBytes; 

    public IObservable<ExternalActivity> ActivityStream { get; } 

    public MonitoringDevice() 
    { 
     _sp = new SerialPort("COM1"); 
     _receivedBytes = Observable 
         .FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
          h => 
          { 
           _sp.DiscardInBuffer(); 
           _sp.DataReceived += h; 
          }, 
          h => 
          { 
           _sp.DataReceived -= h; 
          }) 
         .SelectMany(x => 
         { 

          byte[] buffer = new byte[1024]; 
          var ret = new List<byte>(); 
          int bytesRead = 0; 
          do 
          { 
           bytesRead = _sp.Read(buffer, 0, buffer.Length); 
           ret.AddRange(buffer.Take(bytesRead)); 
          } while ((bytesRead >= buffer.Length)); 
          return ret; 

         }) 
         .Publish() 
         .RefCount(); 


     ActivityStream = StartMonitoringAsync() 
         .SelectMany(_receivedBytes.ToActivity()); 
         // we need to execute StopMonitoringAsync 
         // when a subscription to ActivityStream is disposed 

     _sp.Open(); 
    } 



    private IObservable<Unit> StartMonitoringAsync() 
    { 
     return Observable 
       .Create<Unit>(
       obs => 
       { 
        _sp.Write("START"); 
        return _receivedBytes 
          .ToAcknowledge() 
          .FirstAsync() 
          .Timeout(TimeSpan.FromMilliseconds(1000)) 
          .Subscribe(obs); 
       }); 
    } 


    private IObservable<Unit> StopMonitoringAsync() 
    { 
     return Observable 
       .Create<Unit>(
       obs => 
       { 
        _sp.Write("STOP"); 
        return _receivedBytes 
          .ToAcknowledge() 
          .FirstAsync() 
          .Timeout(TimeSpan.FromMilliseconds(1000)) 
          .Subscribe(obs); 
       }); 
    } 


} 

ExternalActivity ist nur ein POCO.

ToAcknowledge ist eine Erweiterungsmethode, die IObservable zurückgibt, die Unit.Default ausgibt, wenn das Gerät eine Bestätigung sendet. - das funktioniert wie erwartet;

ToActivity ist eine Erweiterungsmethode, die einen IObservable zurückgibt, der die eingehenden seriellen Daten analysiert und ExternalActivity Objekte ausgibt. - das funktioniert wie erwartet;


Edit: Hinzugefügt Implementierung für ToAcknowledge und ToActivity Erweiterungsmethoden.

public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source) 
    { 
     return source.Buffer(3, 1) 
       .Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK 
       .Select(x => Unit.Default); 

    } 

    public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source) 
    { 
     return source 
       .Publish(ps => ps.Buffer(ps.Where(x => x == 1),    // SOH 
              bo => ps.Where(x => x == 4))) // EOT 
       .Select(bfr => bfr.Take(bfr.Count - 1).Skip(1)) 
       .Where(bfr => bfr.Count() == 12) 
       .Select(bfr => 
       { 
        var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0); 
        var id = BitConverter.ToInt32(bfr.ToArray(), 8); 
        return new ExternalActivity(timestamp, id); 
       });   
    } 
+0

Können Sie bitte den Code für 'ToAcknowledge' &' ToActivity' anzeigen? Ich kann dir keine Antwort ohne sie geben. – Enigmativity

+0

@Enigmatismus - Siehe meine bearbeitete Frage. – francezu13k50

+1

Hmm Ich denke nicht, in diesem Fall sollten STOP und Dispose mit dem gleichen zusammengeführt werden. Ich denke, Sie möchten Stop als Befehl modellieren. Und als solche akzeptieren, dass es gelingen kann, scheitern, Timeout etc. Entsorgung ist dann nur rein, trennen und das Abonnement abwerfen. –

Antwort

0

Sie können Ihre StartAsync zu sein, dies ändern:

private IObservable<Unit> StartAsync(Action unsubscribe) 
{ 
    return 
     Observable 
      .Create<Unit>(o => 
      { 
       var subscription = 
        Observable 
         .Timer(TimeSpan.FromSeconds(1)) 
         .Select(_=> Unit.Default) 
         .Subscribe(o); 
       return new CompositeDisposable(
        subscription, 
        Disposable.Create(unsubscribe)); 
      });; 
} 

Dann können Sie in jedem Action unsubscribe Sie injizieren möchten.

Versuchen Tests mit diesem Code:

var subscription = 
    StartAsync(() => Console.WriteLine("Done")) 
    .Subscribe(); 

Thread.Sleep(3000); 

subscription.Dispose(); 

Sie werden sehen, "Fertig", nach 3 Sekunden auf die Konsole geschrieben.

+0

Danke. Die Aktion, die bei der Abonnementverfügung ausgeführt werden soll, ist eine asynchrone Aktion. Es wird als kalt beobachtbares Objekt implementiert, das einen Nebeneffekt (Sendebefehl über SerialPort) ausführt und bei der Ankunft der Antwort einen einzelnen Wert ausgibt. In meinem Codebeispiel wird es mit einem 'Observable.Timer' emuliert. Ich muss diese beobachtbare Hauptabonnementverfügung abwarten. – francezu13k50

+0

@ francezu13k50 - Könnten Sie nicht ein Observable innerhalb der "Action" setzen? Es ist jedoch nie eine gute Idee, sich auf einen Timer zu verlassen, damit eine asynchrone Aktion abgeschlossen werden kann. Rx ist sehr gut im Umgang mit diesen Situationen. Kannst du deinen echten Code zeigen? – Enigmativity

+0

Ich weiß nicht, wie man innerhalb der "Action" ein Observable platziert. Der "Observable.Timer" dient nur dazu, die asynchrone Reaktion des Geräts zu simulieren, im Effekt auf einen "STOP" -Befehl; Ich bin nicht auf einen Timer für die Vollständigkeit der Aktion angewiesen, es ist tatsächlich die Bestätigung des Geräts, die den Operationserfolg anzeigt. – francezu13k50

Verwandte Themen