2010-05-06 12 views
12

Ich habe ein Stream-Objekt, das gelegentlich Daten in unvorhersehbaren Intervallen erhält. Nachrichten, die im Stream angezeigt werden, sind gut definiert und deklarieren die Größe ihrer Nutzlast im Voraus (die Größe ist eine 16-Bit-Ganzzahl, die in den ersten zwei Bytes jeder Nachricht enthalten ist).Kontinuierlich aus einem Stream lesen?

Ich hätte gerne eine StreamWatcher-Klasse, die erkennt, wenn der Stream Daten enthält. Sobald dies der Fall ist, möchte ich, dass ein Ereignis ausgelöst wird, damit eine abonnierte StreamProcessor-Instanz die neue Nachricht verarbeiten kann.

Kann dies mit C# -Ereignissen geschehen, ohne Threads direkt zu verwenden? Es scheint, als sollte es einfach sein, aber ich komme nicht richtig dazu, den richtigen Weg zu finden.

Antwort

0

Ja, das kann gemacht werden. Verwenden Sie die nicht blockierende Methode Stream.BeginRead mit einer AsyncCallback. Der Rückruf wird asynchron aufgerufen, wenn Daten verfügbar werden. Rufen Sie im Rückruf Stream.EndRead an, um die Daten abzurufen, und rufen Sie erneut Stream.BeginRead auf, um den nächsten Datenblock zu erhalten. Buffer eingehende Daten in einem Byte-Array, das groß genug ist, um die Nachricht zu halten. Sobald das Byte-Array voll ist (möglicherweise sind mehrere Callback-Aufrufe erforderlich), wird das Ereignis ausgelöst. Lesen Sie dann die nächste Nachrichtengröße, erstellen Sie einen neuen Puffer, wiederholen Sie, fertig.

+0

Er sagte ohne Verwendung von Threads !!! =) – Nayan

+0

@Nayan: Asynchrony * ist * Multithreading. Dieses Problem ist inhärent eines von Multithreading. Ich vermute, was das OP bedeutet, dass er die Threads nicht explizit selbst erstellen will. –

+0

@Nayan: Ich nehme an, das OP sucht nach einer Lösung, die nicht explizit einen neuen Thread erstellt und die blockierende Read-Methode verwendet, sondern für eine nicht blockierende asynchrone Lösung. Sie können keine Asynchronität mit keinen Threads haben. – dtb

1

Der normale Ansatz ist die Verwendung der .NET asynchronous programming pattern ausgesetzt von Stream. Im Wesentlichen beginnen Sie asynchron zu lesen, indem Sie Stream.BeginRead aufrufen, einen byte[] Puffer übergeben und eine Rückrufmethode aufrufen, die aufgerufen wird, wenn Daten aus dem Stream gelesen wurden. In der Callback-Methode rufen Sie Stream.EndRead auf und übergeben es das IAsncResult Argument, das Ihrem Rückruf übergeben wurde. Der Rückgabewert von EndRead sagt Ihnen, wie viele Bytes in den Puffer gelesen wurden.

Sobald Sie die ersten paar Bytes auf diese Weise erhalten haben, können Sie dann auf den Rest der Nachricht warten (wenn Sie beim ersten Mal nicht genügend Daten erhalten haben), indem Sie erneut BeginRead aufrufen. Sobald Sie die gesamte Nachricht erhalten haben, können Sie das Ereignis auslösen.

11

Wenn Sie sagen, nicht Threads verwenden direkt, ich nehme an, Sie sie noch verwenden möchten indirekt über Asynchron-Anrufe, sonst wäre dies nicht sehr nützlich sein.

Sie müssen nur die asynchronen Methoden der Stream umbrechen und das Ergebnis in einem Puffer speichern. Lassen Sie uns zunächst das Ereignis Teil der Spezifikation definieren:

public delegate void MessageAvailableEventHandler(object sender, 
    MessageAvailableEventArgs e); 

public class MessageAvailableEventArgs : EventArgs 
{ 
    public MessageAvailableEventArgs(int messageSize) : base() 
    { 
     this.MessageSize = messageSize; 
    } 

    public int MessageSize { get; private set; } 
} 

Jetzt, ein 16-Bit-Integer aus dem Stream lesen asynchron und berichten, wenn es fertig ist:

public class StreamWatcher 
{ 
    private readonly Stream stream; 

    private byte[] sizeBuffer = new byte[2]; 

    public StreamWatcher(Stream stream) 
    { 
     if (stream == null) 
      throw new ArgumentNullException("stream"); 
     this.stream = stream; 
     WatchNext(); 
    } 

    protected void OnMessageAvailable(MessageAvailableEventArgs e) 
    { 
     var handler = MessageAvailable; 
     if (handler != null) 
      handler(this, e); 
    } 

    protected void WatchNext() 
    { 
     stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback), 
      null); 
    } 

    private void ReadCallback(IAsyncResult ar) 
    { 
     int bytesRead = stream.EndRead(ar); 
     if (bytesRead != 2) 
      throw new InvalidOperationException("Invalid message header."); 
     int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0]; 
     OnMessageAvailable(new MessageAvailableEventArgs(messageSize)); 
     WatchNext(); 
    } 

    public event MessageAvailableEventHandler MessageAvailable; 
} 

Ich denke, dass es sich handelt. Dies setzt voraus, dass die Klasse, die die Nachricht verarbeitet, auch Zugriff auf Stream hat und bereit ist, sie synchron oder asynchron basierend auf der Nachrichtengröße im Ereignis zu lesen. Wenn Sie möchten, dass die Watcher-Klasse tatsächlich die gesamte Nachricht liest, müssen Sie etwas mehr Code hinzufügen.

+0

+1 für die Implementierung, die meine Theorie von BeginRead beweist, ist nicht die richtige Lösung. Ihre Implementierung hat diesen Zweifel beseitigt. – Nayan

+0

Glorreich! Das sieht so aus, als würde es den Trick machen, und ich werde es morgen ausprobieren. –

+0

+1. Nitpicking: AFAIK a Stream.Read darf weniger als Count Bytes zurückgeben, solange es mindestens ein Byte zurückgibt (wenn das Ende nicht erreicht wird). Wenn also '(bytesRead! = 2)', sollten Sie keine Ausnahme auslösen, sondern BeginRead erneut, bis zwei Bytes gelesen wurden. – dtb

1

Verwendet Stream.BeginRead() nicht wie die synchrone Stream.Read() Methode in einem separaten Thread?