Ich finde es am einfachsten, einen einzigen Thread zu haben, der die Koordination und das Schreiben behandelt und dann das Lesen mit Async behandelt. Durch Einschließen eines CancellationTokenSource
in einem AsyncState
Objekt kann der Lesercode dem sendenden Thread signalisieren, die Verbindung neu zu starten, wenn der Empfänger einen Fehler feststellt (Aufruf von EndRead
oder wenn der Strom vollständig ist). Der Koordinator/Schreiber sitzt in einer Schleife, die die Verbindung erstellt und dann eine Schleife von BlockingCollection<T>
von zu sendenden Elementen ausführt. Unter Verwendung von BlockingCollection.GetConsumingEnumerable(token)
kann der Absender abgebrochen werden, wenn der Leser einen Fehler feststellt.
private class AsyncState
{
public byte[] Buffer { get; set; }
public NetworkStream NetworkStream { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
}
Nachdem Sie die Verbindung erstellt haben Sie den Asynchron-Lesevorgang starten kann (die sich so lange Aufruf hält wie alles funktioniert). Vorbei an den Puffer, den Strom und die CancellationTokenSource
im Statusobjekt:
var buffer = new byte[1];
stream.BeginRead(buffer, 0, 1, Callback,
new AsyncState
{
Buffer = buffer,
NetworkStream = stream,
CancellationTokenSource = cts2
});
Danach Sie anfangen aus Ihrer Ausgangswarteschlange zu lesen und in den Stream zu schreiben, bis sie abgebrochen oder ein Fehler passiert:
using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true))
{
foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken))
{
...
. .. und im Callback können Sie nach Fehlern suchen und, falls notwendig, die CancellationTokenSource drücken, um dem Writer-Thread zu signalisieren, dass die Verbindung neu gestartet werden soll.
private void Callback(IAsyncResult ar)
{
var state = (AsyncState)ar.AsyncState;
if (ar.IsCompleted)
{
try
{
int bytesRead = state.NetworkStream.EndRead(ar);
LogState("Post read ", state.NetworkStream);
}
catch (Exception ex)
{
Log.Warn("Exception during EndRead", ex);
state.CancellationTokenSource.Cancel();
return;
}
// Deal with the character received
char c = (char)state.Buffer[0];
if (c < 0)
{
Log.Warn("c < 0, stream closing");
state.CancellationTokenSource.Cancel();
return;
}
... deal with the character here, building up a buffer and
... handing it out to the application when completed
... perhaps using Rx Subject<T> to make it easy to subscribe
... and finally ask for the next byte with the same Callback
// Launch the next reader
var buffer2 = new byte[1];
var state2 = state.WithNewBuffer(buffer2);
state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2);
Das ist die Linie, die ich gerne erkunden würde. Ich konnte mir den Code für meinen Anwendungsfall jedoch nicht vorstellen. Ich muss zwei Byte-Header lesen, der die Länge des Körpers enthält und dann den Körper lesen. Wie mache ich das? Ich habe zwar den ReadHeader, der ReadRody mit BeingRead aufruft, klingt aber so, als müsste ich den Rest des Körpers synchron in der ReadBody-Methode lesen. Wie sollte ich CancellationTokenSource verwenden? – Gatis
Ich habe Code hinzugefügt, um zu zeigen, wie man mit dem Lesen von jeweils einem Byte umgeht und wie das Löschungs-Token übergeben wird und verwendet wird, um den Schreib-Thread zu signalisieren, wenn ein Lesefehler auftritt. Sie könnten damit beginnen, zwei Bytes für die anfängliche asynchrone Leseoperation zu lesen und dann den Rest in einem einzelnen Aufruf zu lesen. –
Großartig. Brauche ich wirklich state2? Warum kann ich den Zustand nicht wiederverwenden? – Gatis