2015-09-07 5 views
5

Ich habe einen Thread Anforderungen an NetworkStream schreiben. Ein anderer Thread liest Antworten aus diesem Stream.C# Wie wird das Lesen und Schreiben von Threads beim erneuten Verbinden von NetworkStream koordiniert?

Ich möchte es fehlertolerant machen. Im Falle eines Netzwerkausfalls möchte ich, dass NetworkStream durch einen neuen ersetzt wird.

Ich habe die zwei Threads Handle IO/Socket-Ausnahmen. Jeder von ihnen wird versuchen, die Verbindung wiederherzustellen. Ich habe Mühe, diese beiden Fäden zu koordinieren. Ich musste Sperrabschnitte platzieren, die den Code ziemlich kompliziert und fehleranfällig machen.

Gibt es eine empfohlene Möglichkeit, dies zu implementieren? Vielleicht mit einem einzigen Thread, aber Lese- oder Schreib-async?

Antwort

1

Ich finde es am einfachsten, einen einzigen Thread zu haben, der die Koordination und das Schreiben behandelt und dann das Lesen mit Async behandelt. Durch Einschließen eines CancellationTokenSource in einem AsyncState Objekt kann der Lesercode dem sendenden Thread signalisieren, die Verbindung neu zu starten, wenn der Empfänger einen Fehler feststellt (Aufruf von EndRead oder wenn der Strom vollständig ist). Der Koordinator/Schreiber sitzt in einer Schleife, die die Verbindung erstellt und dann eine Schleife von BlockingCollection<T> von zu sendenden Elementen ausführt. Unter Verwendung von BlockingCollection.GetConsumingEnumerable(token) kann der Absender abgebrochen werden, wenn der Leser einen Fehler feststellt.

private class AsyncState 
    { 
     public byte[] Buffer { get; set; } 
     public NetworkStream NetworkStream { get; set; } 
     public CancellationTokenSource CancellationTokenSource { get; set; } 
    } 

Nachdem Sie die Verbindung erstellt haben Sie den Asynchron-Lesevorgang starten kann (die sich so lange Aufruf hält wie alles funktioniert). Vorbei an den Puffer, den Strom und die CancellationTokenSource im Statusobjekt:

var buffer = new byte[1]; 
stream.BeginRead(buffer, 0, 1, Callback, 
    new AsyncState 
    { 
     Buffer = buffer, 
     NetworkStream = stream, 
     CancellationTokenSource = cts2 
    }); 

Danach Sie anfangen aus Ihrer Ausgangswarteschlange zu lesen und in den Stream zu schreiben, bis sie abgebrochen oder ein Fehler passiert:

using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true)) 
{ 
    foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken)) 
    { 
     ... 

. .. und im Callback können Sie nach Fehlern suchen und, falls notwendig, die CancellationTokenSource drücken, um dem Writer-Thread zu signalisieren, dass die Verbindung neu gestartet werden soll.

private void Callback(IAsyncResult ar) 
{ 
    var state = (AsyncState)ar.AsyncState; 

    if (ar.IsCompleted) 
    { 
    try 
    { 
     int bytesRead = state.NetworkStream.EndRead(ar); 
     LogState("Post read ", state.NetworkStream); 
    } 
    catch (Exception ex) 
    { 
     Log.Warn("Exception during EndRead", ex); 
     state.CancellationTokenSource.Cancel(); 
     return; 
    } 

    // Deal with the character received 
    char c = (char)state.Buffer[0]; 

    if (c < 0) 
    { 
     Log.Warn("c < 0, stream closing"); 
     state.CancellationTokenSource.Cancel(); 
     return; 
    } 

    ... deal with the character here, building up a buffer and 
    ... handing it out to the application when completed 
    ... perhaps using Rx Subject<T> to make it easy to subscribe 

    ... and finally ask for the next byte with the same Callback 

    // Launch the next reader 
    var buffer2 = new byte[1]; 
    var state2 = state.WithNewBuffer(buffer2); 
    state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2); 
+0

Das ist die Linie, die ich gerne erkunden würde. Ich konnte mir den Code für meinen Anwendungsfall jedoch nicht vorstellen. Ich muss zwei Byte-Header lesen, der die Länge des Körpers enthält und dann den Körper lesen. Wie mache ich das? Ich habe zwar den ReadHeader, der ReadRody mit BeingRead aufruft, klingt aber so, als müsste ich den Rest des Körpers synchron in der ReadBody-Methode lesen. Wie sollte ich CancellationTokenSource verwenden? – Gatis

+0

Ich habe Code hinzugefügt, um zu zeigen, wie man mit dem Lesen von jeweils einem Byte umgeht und wie das Löschungs-Token übergeben wird und verwendet wird, um den Schreib-Thread zu signalisieren, wenn ein Lesefehler auftritt. Sie könnten damit beginnen, zwei Bytes für die anfängliche asynchrone Leseoperation zu lesen und dann den Rest in einem einzelnen Aufruf zu lesen. –

+0

Großartig. Brauche ich wirklich state2? Warum kann ich den Zustand nicht wiederverwenden? – Gatis

Verwandte Themen