Ich habe ein Gerät, das über den seriellen Anschluss mit dem Computer kommuniziert. Nach dem Senden eines "START" -Befehls antwortet das Gerät mit einer Bestätigung und beginnt mit der Überwachung einiger externer Aktivitäten. Es überträgt dann asynchron einige Nachrichten auf der seriellen Schnittstelle abhängig von dieser externen Aktivität. Wenn das Gerät einen "STOP" -Befehl empfängt, antwortet es mit einer Bestätigung und hört dann auf, weitere Nachrichten zu senden (die die externe Aktivität darstellen).Rx.Net: Führen Sie einen asynchronen Nebeneffekt aus, wenn das Abonnement entsorgt wird
Ich habe die Start/Stop-Befehle mit kalten Observablen implementiert, die Nebeneffekte (Send-Befehl an der seriellen Schnittstelle) ausführen und eine einzelne Unit.Default
ausgeben, wenn ein Empfang am SerialPort empfangen wird. Ich möchte ein IObservable
konstruieren, das Nachrichten ausgibt, die der externen Aktivität entsprechen, und den Nebeneffekt "START" ausführen, wenn es abonniert ist, und den Nebeneffekt "STOP", wenn das Abonnement entsorgt wird. "START" ist einfach, ich muss nur ein `SelectMany 'machen, aber ich weiß nicht, wie man" STOP "durchführt.
class MonitoringDevice
{
private SerialPort _sp;
private IObservable<byte> _receivedBytes;
public IObservable<ExternalActivity> ActivityStream { get; }
public MonitoringDevice()
{
_sp = new SerialPort("COM1");
_receivedBytes = Observable
.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
h =>
{
_sp.DiscardInBuffer();
_sp.DataReceived += h;
},
h =>
{
_sp.DataReceived -= h;
})
.SelectMany(x =>
{
byte[] buffer = new byte[1024];
var ret = new List<byte>();
int bytesRead = 0;
do
{
bytesRead = _sp.Read(buffer, 0, buffer.Length);
ret.AddRange(buffer.Take(bytesRead));
} while ((bytesRead >= buffer.Length));
return ret;
})
.Publish()
.RefCount();
ActivityStream = StartMonitoringAsync()
.SelectMany(_receivedBytes.ToActivity());
// we need to execute StopMonitoringAsync
// when a subscription to ActivityStream is disposed
_sp.Open();
}
private IObservable<Unit> StartMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("START");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
private IObservable<Unit> StopMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("STOP");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
}
ExternalActivity
ist nur ein POCO.
ToAcknowledge
ist eine Erweiterungsmethode, die IObservable
zurückgibt, die Unit.Default
ausgibt, wenn das Gerät eine Bestätigung sendet. - das funktioniert wie erwartet;
ToActivity
ist eine Erweiterungsmethode, die einen IObservable
zurückgibt, der die eingehenden seriellen Daten analysiert und ExternalActivity
Objekte ausgibt. - das funktioniert wie erwartet;
Edit: Hinzugefügt Implementierung für ToAcknowledge
und ToActivity
Erweiterungsmethoden.
public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source)
{
return source.Buffer(3, 1)
.Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK
.Select(x => Unit.Default);
}
public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source)
{
return source
.Publish(ps => ps.Buffer(ps.Where(x => x == 1), // SOH
bo => ps.Where(x => x == 4))) // EOT
.Select(bfr => bfr.Take(bfr.Count - 1).Skip(1))
.Where(bfr => bfr.Count() == 12)
.Select(bfr =>
{
var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0);
var id = BitConverter.ToInt32(bfr.ToArray(), 8);
return new ExternalActivity(timestamp, id);
});
}
Können Sie bitte den Code für 'ToAcknowledge' &' ToActivity' anzeigen? Ich kann dir keine Antwort ohne sie geben. – Enigmativity
@Enigmatismus - Siehe meine bearbeitete Frage. – francezu13k50
Hmm Ich denke nicht, in diesem Fall sollten STOP und Dispose mit dem gleichen zusammengeführt werden. Ich denke, Sie möchten Stop als Befehl modellieren. Und als solche akzeptieren, dass es gelingen kann, scheitern, Timeout etc. Entsorgung ist dann nur rein, trennen und das Abonnement abwerfen. –