2011-01-12 18 views
25

Ich habe einen Hörer:Multi-Threading mit .Net Httplistener

listener = new HttpListener(); 
listener.Prefixes.Add(@"http://+:8077/"); 
listener.Start(); 
listenerThread = new Thread(HandleRequests); 
listenerThread.Start(); 

Und ich bin Umgang mit Anfragen:

private void HandleRequests() 
{ 
    while (listener.IsListening) 
    { 
     var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); 
     context.AsyncWaitHandle.WaitOne(); 
    } 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 
} 

Ich möchte void Stop() so schreiben, dass:

  1. Es wird blockiert, bis alle aktuell bearbeiteten Anfragen enden (dh auf alle Threads warten, "um etwas zu tun").
  2. Während auf bereits gestartete Anfragen gewartet wird, werden keine weiteren Anfragen mehr erlaubt (zB Rückkehr zu Beginn von ListenerCallback).
  3. Danach wird es anrufen listener.Stop() (listener.IsListening wurde falsch).

Wie könnte es schreiben?

EDIT: Was denken Sie über diese Lösung? Ist es sicher?

public void Stop() 
{ 
    lock (this) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (this) 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 

    lock (this) 
    { 
     if (--numberOfRequests == 0) 
      resetEvent.Set(); 
    } 
} 

Antwort

2

Ich habe meinen Code in EDIT konsultiert Teil meiner Frage und ich habe mich entschieden, es mit einigen Änderungen zu akzeptieren:

public void Stop() 
{ 
    lock (locker) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (locker) //locking on this is a bad idea, but I forget about it before 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    try 
    { 
     var listener = ar.AsyncState as HttpListener; 

     var context = listener.EndGetContext(ar); 

     //do some stuff 
    } 
    finally //to make sure that bellow code will be executed 
    { 
     lock (locker) 
     { 
      if (--numberOfRequests == 0) 
       resetEvent.Set(); 
     } 
    } 
} 
0

Rufen Sie einfach listener.Stop() sollte den Trick tun. Dies wird keine Verbindungen beenden, die bereits eingerichtet wurden, aber neue Verbindungen verhindern.

+1

Dies ist nicht wahr. Wenn Sie 'listener.Stop()' während der Ausführung von 'ListenerCallback' aufrufen, erhalten Sie eine Ausnahme, z. beim Aufruf von 'EndGetContext' oder sogar später, wenn Ausgabestrom verwendet wird. Ich kann die Ausnahmen natürlich abfangen, aber das würde ich lieber nicht tun. – prostynick

+0

In meinem Code benutze ich ein Flag und nicht mehr auf den Listener, nachdem ich Stop aufgerufen habe, aber das Schließen des Listeners schließt nicht die bereits akzeptierten Verbindungen, nur den Listener. –

+0

Ich weiß nicht, was meinst du mit "Ich benutze eine Flagge". Das Problem ist, dass ich in "ListenerCallback" Listener verwende und wenn ein anderer Thread es schließt, während ich es benutze, werde ich mit Ausnahmen enden, die ich erwähnt habe. – prostynick

4

Nun gibt es mehrere Möglichkeiten, dies zu lösen ... Dies ist ein einfaches Beispiel, das ein Semaphor verwendet, um laufende Arbeit zu verfolgen, und ein Signal, das ausgelöst wird, wenn alle Arbeiter fertig sind. Dies sollte Ihnen eine Grundidee geben, von der Sie arbeiten können.

Die folgende Lösung ist nicht ideal, idealerweise sollten wir den Semaphor vor dem Aufruf von BeginGetContext erwerben. Das macht das Herunterfahren schwieriger, daher habe ich mich für diesen vereinfachten Ansatz entschieden. Wenn ich das für 'echt' machen würde, würde ich wahrscheinlich mein eigenes Thread-Management schreiben, anstatt mich auf den ThreadPool zu verlassen. Dies würde eine zuverlässigere Abschaltung ermöglichen.

Auf jeden Fall ist hier das komplette Beispiel:

class TestHttp 
{ 
    static void Main() 
    { 
     using (HttpServer srvr = new HttpServer(5)) 
     { 
      srvr.Start(8085); 
      Console.WriteLine("Press [Enter] to quit."); 
      Console.ReadLine(); 
     } 
    } 
} 


class HttpServer : IDisposable 
{ 
    private readonly int _maxThreads; 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly ManualResetEvent _stop, _idle; 
    private readonly Semaphore _busy; 

    public HttpServer(int maxThreads) 
    { 
     _maxThreads = maxThreads; 
     _stop = new ManualResetEvent(false); 
     _idle = new ManualResetEvent(false); 
     _busy = new Semaphore(maxThreads, maxThreads); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     _idle.Reset(); 

     //aquire and release the semaphore to see if anyone is running, wait for idle if they are. 
     _busy.WaitOne(); 
     if(_maxThreads != 1 + _busy.Release()) 
      _idle.WaitOne(); 

     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ListenerCallback, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ListenerCallback(IAsyncResult ar) 
    { 
     _busy.WaitOne(); 
     try 
     { 
      HttpListenerContext context; 
      try 
      { context = _listener.EndGetContext(ar); } 
      catch (HttpListenerException) 
      { return; } 

      if (_stop.WaitOne(0, false)) 
       return; 

      Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); 
      context.Response.SendChunked = true; 
      using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) 
      { 
       tw.WriteLine("<html><body><h1>Hello World</h1>"); 
       for (int i = 0; i < 5; i++) 
       { 
        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now); 
        tw.Flush(); 
        Thread.Sleep(1000); 
       } 
       tw.WriteLine("</body></html>"); 
      } 
     } 
     finally 
     { 
      if (_maxThreads == 1 + _busy.Release()) 
       _idle.Set(); 
     } 
    } 
} 
56

Für Vollständigkeit, hier ist, wie es aussehen würde, wenn Sie Ihre eigenen Worker-Threads zu verwalten:

class HttpServer : IDisposable 
{ 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly Thread[] _workers; 
    private readonly ManualResetEvent _stop, _ready; 
    private Queue<HttpListenerContext> _queue; 

    public HttpServer(int maxThreads) 
    { 
     _workers = new Thread[maxThreads]; 
     _queue = new Queue<HttpListenerContext>(); 
     _stop = new ManualResetEvent(false); 
     _ready = new ManualResetEvent(false); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 

     for (int i = 0; i < _workers.Length; i++) 
     { 
      _workers[i] = new Thread(Worker); 
      _workers[i].Start(); 
     } 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     foreach (Thread worker in _workers) 
      worker.Join(); 
     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ContextReady, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ContextReady(IAsyncResult ar) 
    { 
     try 
     { 
      lock (_queue) 
      { 
       _queue.Enqueue(_listener.EndGetContext(ar)); 
       _ready.Set(); 
      } 
     } 
     catch { return; } 
    } 

    private void Worker() 
    { 
     WaitHandle[] wait = new[] { _ready, _stop }; 
     while (0 == WaitHandle.WaitAny(wait)) 
     { 
      HttpListenerContext context; 
      lock (_queue) 
      { 
       if (_queue.Count > 0) 
        context = _queue.Dequeue(); 
       else 
       { 
        _ready.Reset(); 
        continue; 
       } 
      } 

      try { ProcessRequest(context); } 
      catch (Exception e) { Console.Error.WriteLine(e); } 
     } 
    } 

    public event Action<HttpListenerContext> ProcessRequest; 
} 
+0

Das ist genial - es dient als ein guter Kandidat, um HttpListener Durchsatz gegen zu testen. – Jonno

+0

Vielen Dank für dieses Stück Code! Es gibt zwei kleine Probleme: 1. ProcessRequest könnte null sein 2. HttpListenerContext ist nicht threadsafe, es sei denn, es ist statisch –

+0

@MartinMeeser danke für den Kommentar. für 1. anstatt es in try catch block zu wickeln, könnten wir diese 'ProcessRequest? .Invoke (context);'. Für 2. wenn statische ist keine Option, was empfehlen Sie? – JohnTube

0

Dies verwendet die Warteschlange mit blockierender Sammlung, um Anforderungen zu bearbeiten. Es ist nutzbar wie es ist. Sie sollten eine Klasse von dieser ableiten und Response überschreiben.

using System; 
using System.Collections.Concurrent; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class HttpServer : IDisposable 
    { 
     private HttpListener httpListener; 
     private Thread listenerLoop; 
     private Thread[] requestProcessors; 
     private BlockingCollection<HttpListenerContext> messages; 

     public HttpServer(int threadCount) 
     { 
      requestProcessors = new Thread[threadCount]; 
      messages = new BlockingCollection<HttpListenerContext>(); 
      httpListener = new HttpListener(); 
     } 

     public virtual int Port { get; set; } = 80; 

     public virtual string[] Prefixes 
     { 
      get { return new string[] {string.Format(@"http://+:{0}/", Port)}; } 
     } 

     public void Start(int port) 
     { 
      listenerLoop = new Thread(HandleRequests); 

      foreach(string prefix in Prefixes) httpListener.Prefixes.Add(prefix); 

      listenerLoop.Start(); 

      for (int i = 0; i < requestProcessors.Length; i++) 
      { 
       requestProcessors[i] = StartProcessor(i, messages); 
      } 
     } 

     public void Dispose() { Stop(); } 

     public void Stop() 
     { 
      messages.CompleteAdding(); 

      foreach (Thread worker in requestProcessors) worker.Join(); 

      httpListener.Stop(); 
      listenerLoop.Join(); 
     } 

     private void HandleRequests() 
     { 
      httpListener.Start(); 
      try 
      { 
       while (httpListener.IsListening) 
       { 
        Console.WriteLine("The Linstener Is Listening!"); 
        HttpListenerContext context = httpListener.GetContext(); 

        messages.Add(context); 
        Console.WriteLine("The Linstener has added a message!"); 
       } 
      } 
      catch(Exception e) 
      { 
       Console.WriteLine (e.Message); 
      } 
     } 

     private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Thread thread = new Thread(() => Processor(number, messages)); 
      thread.Start(); 
      return thread; 
     } 

     private void Processor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Console.WriteLine ("Processor {0} started.", number); 
      try 
      { 
       for (;;) 
       { 
        Console.WriteLine ("Processor {0} awoken.", number); 
        HttpListenerContext context = messages.Take(); 
        Console.WriteLine ("Processor {0} dequeued message.", number); 
        Response (context); 
       } 
      } catch { } 

      Console.WriteLine ("Processor {0} terminated.", number); 
     } 

     public virtual void Response(HttpListenerContext context) 
     { 
      SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>")); 
     } 

     public static void SendReply(HttpListenerContext context, StringBuilder responseString) 
     { 
      byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString()); 
      context.Response.ContentLength64 = buffer.Length; 
      System.IO.Stream output = context.Response.OutputStream; 
      output.Write(buffer, 0, buffer.Length); 
      output.Close(); 
     } 
    } 
} 

Dies ist ein Beispiel für die Verwendung. Keine Notwendigkeit, Ereignisse oder Sperrblöcke zu verwenden. Die BlockingCollection löst all diese Probleme.

using System; 
using System.Collections.Concurrent; 
using System.IO; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class Server 
    { 
    public static void Main (string[] args) 
    { 
     HttpServer Service = new QuizzServer (8); 
     Service.Start (80); 
     for (bool coninute = true; coninute ;) 
     { 
      string input = Console.ReadLine().ToLower(); 
      switch (input) 
      { 
       case "stop": 
        Console.WriteLine ("Stop command accepted."); 
        Service.Stop(); 
        coninute = false; 
        break; 
       default: 
        Console.WriteLine ("Unknown Command: '{0}'.",input); 
        break; 
      } 
     } 
    } 
    } 
}