2013-05-01 14 views
5

Ich habe ein Problem mit einem multi-threaded Server, den ich als akademische Übung aufbaue, genauer gesagt, um eine Verbindung zum Schließen zu erhalten.Würde einen Thread beenden, der auf eine blockierende Warteschlange wartet

Jede Verbindung wird von einer Session-Klasse verwaltet. Diese Klasse verwaltet 2 Threads für die Verbindung, einen DownstreamThread und einen UpstreamThread.

Der UpstreamThread blockiert den Client-Socket und codiert alle eingehenden Strings in Nachrichten, die an einen anderen Layer übergeben werden. Der DownstreamThread blockiert eine BlockingQueue, in die Nachrichten für den Client eingefügt werden. Wenn sich eine Nachricht in der Warteschlange befindet, nimmt der Downstream-Thread die Nachricht aus der Warteschlange, wandelt sie in eine Zeichenfolge um und sendet sie an den Client. Im letzten System wird eine Anwendungsschicht auf eingehende Nachrichten einwirken und ausgehende Nachrichten an den Server senden, um sie an den entsprechenden Client zu senden. Aber jetzt habe ich nur eine einfache Anwendung, die für eine Sekunde auf einer eingehenden Nachricht ruht, bevor sie zurückgesendet wird als ausgehende Nachricht mit einem Zeitstempel angehängt.

Das Problem, das ich habe, ist, dass die ganze Sache anmutig heruntergefahren wird, wenn der Client die Verbindung trennt. Das erste Problem, mit dem ich zu kämpfen habe, ist eine normale Trennung, bei der der Client dem Server mitteilt, dass er die Verbindung mit einem QUIT-Befehl beendet. Der grundlegende Pseudocode ist:

while (!quitting) { 
    inputString = socket.readLine() // blocks 
    if (inputString != "QUIT") { 
     // forward the message upstream 
     server.acceptMessage (inputString); 
    } else { 
     // Do cleanup 
     quitting = true; 
     socket.close(); 
    } 
} 

Die Hauptschleife des vorgelagerten Threads betrachtet die Eingabezeichenfolge. Wenn es QUIT ist, setzt der Thread ein Flag, um zu sagen, dass der Client die Kommunikation beendet hat und die Schleife beendet. Dies führt dazu, dass der Upstream-Thread gut herunterfährt.

Die Hauptschleife des Downstream-Threads wartet auf Nachrichten in der BlockingQueue, solange das Flag zum Schließen der Verbindung nicht gesetzt ist. Wenn dies der Fall ist, soll auch der Downstream-Thread enden. Allerdings nicht, es sitzt einfach nur da und wartet. Seine psuedocode sieht wie folgt aus:

while (!quitting) { 
    outputMessage = messageQueue.take(); // blocks 
    sendMessageToClient (outputMessage); 
} 

Als ich das getestet habe ich festgestellt, dass, wenn der Client zu beenden, der Upstream-Thread heruntergefahren, aber das Downstream-Thread nicht.

Nach ein bisschen Kopf kratzen, erkannte ich, dass der Downstream-Thread immer noch blockiert auf die BlockingQueue warten auf eine eingehende Nachricht, die nie kommen wird. Der Upstream-Thread leitet die QUIT-Nachricht nicht weiter in der Kette weiter.

Wie kann ich den Downstream Thread anmutig herunterfahren lassen? Die erste Idee, die mir in den Sinn kam, war eine Zeitüberschreitung beim Aufruf von take(). Ich bin jedoch nicht so begeistert von dieser Idee, denn was auch immer Sie für einen Wert auswählen, es wird bestimmt nicht ganz zufriedenstellend sein. Entweder ist es zu lang und ein Zombie-Thread sitzt lange vor dem Herunterfahren, oder es ist zu kurz und Verbindungen, die für ein paar Minuten inaktiv waren, aber immer noch gültig sind, werden getötet. Ich dachte daran, die QUIT-Nachricht über die Kette zu schicken, aber das erfordert, dass sie eine vollständige Runde zum Server macht, dann die Anwendung, dann wieder zurück zum Server und schließlich zur Sitzung. Dies scheint auch keine elegante Lösung zu sein.

Ich habe die Dokumentation für Thread.stop() angeschaut, aber das ist anscheinend veraltet, weil es sowieso nie richtig funktioniert hat, so dass es aussieht, als wäre es auch keine Option. Eine andere Idee, die ich hatte, war, eine Exception im Downstream-Thread irgendwie auszulösen und sie in ihrem letzten Block aufzuräumen, aber das erscheint mir als eine schreckliche und kluge Idee.

Ich glaube, dass beide Threads in der Lage sein sollten, selbstständig herunterzufahren, aber ich vermute auch, dass wenn ein Thread endet, muss auch der andere Thread proaktiver signalisieren, als einfach einen Flag für den anderen zu setzen Thread zu überprüfen.Da ich mit Java noch nicht sehr erfahren bin, bin ich an dieser Stelle eher von Ideen ausgeschlossen. Wenn jemand einen Rat hat, würde es sehr geschätzt werden.

Der Vollständigkeit halber habe ich den richtigen Code für die Session-Klasse unten eingefügt, obwohl ich glaube, dass die Pseudocode-Snippets die relevanten Teile des Problems abdecken. Die gesamte Klasse umfasst etwa 250 Zeilen.

import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.*; 

/** 
* Session class 
* 
* A session manages the individual connection between a client and the server. 
* It accepts input from the client and sends output to the client over the 
* provided socket. 
* 
*/ 
public class Session { 
    private Socket    clientSocket = null; 
    private Server    server   = null; 
    private Integer    sessionId  = 0; 
    private DownstreamThread downstream  = null; 
    private UpstreamThread  upstream  = null; 
    private boolean    sessionEnding = false; 

    /** 
    * This thread handles waiting for messages from the server and sending 
    * them to the client 
    */ 
    private class DownstreamThread implements Runnable { 
     private BlockingQueue<DownstreamMessage> incomingMessages = null; 
     private OutputStreamWriter     streamWriter  = null; 
     private Session        outer    = null; 

     @Override 
     public void run() { 
      DownstreamMessage message; 
      Thread.currentThread().setName ("DownstreamThread_" + outer.getId()); 

      try { 
       // Send connect message 
       this.sendMessageToClient ("Hello, you are client " + outer.getId()); 

       while (!outer.sessionEnding) { 
        message = this.incomingMessages.take(); 
        this.sendMessageToClient (message.getPayload()); 
       } 

       // Send disconnect message 
       this.sendMessageToClient ("Goodbye, client " + getId()); 

      } catch (InterruptedException | IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } finally { 
       this.terminate(); 
      } 
     } 

     /** 
     * Add a message to the downstream queue 
     * 
     * @param message 
     * @return 
     * @throws InterruptedException 
     */ 
     public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException { 
      if (!outer.sessionEnding) { 
       this.incomingMessages.put (message); 
      } 

      return this; 
     } 

     /** 
     * Send the given message to the client 
     * 
     * @param message 
     * @throws IOException 
     */ 
     private DownstreamThread sendMessageToClient (CharSequence message) throws IOException { 
      OutputStreamWriter osw; 
      // Output to client 
      if (null != (osw = this.getStreamWriter())) { 
       osw.write ((String) message); 
       osw.write ("\r\n"); 
       osw.flush(); 
      } 

      return this; 
     } 

     /** 
     * Perform session cleanup 
     * 
     * @return 
     */ 
     private DownstreamThread terminate() { 
      try { 
       this.streamWriter.close(); 
      } catch (IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } 
      this.streamWriter = null; 

      return this; 
     } 

     /** 
     * Get an output stream writer, initialize it if it's not active 
     * 
     * @return A configured OutputStreamWriter object 
     * @throws IOException 
     */ 
     private OutputStreamWriter getStreamWriter() throws IOException { 
      if ((null == this.streamWriter) 
      && (!outer.sessionEnding)) { 
       BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream()); 
       this.streamWriter  = new OutputStreamWriter (os, "UTF8"); 
      } 

      return this.streamWriter; 
     } 

     /** 
     * 
     * @param outer 
     */ 
     public DownstreamThread (Session outer) { 
      this.outer    = outer; 
      this.incomingMessages = new LinkedBlockingQueue(); 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * This thread handles waiting for client input and sending it upstream 
    */ 
    private class UpstreamThread implements Runnable { 
     private Session outer = null; 

     @Override 
     public void run() { 
      StringBuffer inputBuffer = new StringBuffer(); 
      BufferedReader inReader; 

      Thread.currentThread().setName ("UpstreamThread_" + outer.getId()); 

      try { 
       inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8")); 

       while (!outer.sessionEnding) { 
        // Read whatever was in the input buffer 
        inputBuffer.delete (0, inputBuffer.length()); 
        inputBuffer.append (inReader.readLine()); 
        System.out.println ("Input message was: " + inputBuffer); 

        if (!inputBuffer.toString().equals ("QUIT")) { 
         // Forward the message up the chain to the Server 
         outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString())); 
        } else { 
         // End the session 
         outer.sessionEnding = true; 
        } 
       } 

      } catch (IOException | InterruptedException e) { 
       Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
      } finally { 
       outer.terminate(); 
       outer.server.deleteSession (outer.getId()); 
      } 
     } 

     /** 
     * Class constructor 
     * 
     * The Core Java volume 1 book said that a constructor such as this 
     * should be implicitly created, but that doesn't seem to be the case! 
     * 
     * @param outer 
     */ 
     public UpstreamThread (Session outer) { 
      this.outer = outer; 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * Start the session threads 
    */ 
    public void run() //throws InterruptedException 
    { 
     Thread upThread  = new Thread (this.upstream); 
     Thread downThread = new Thread (this.downstream); 

     upThread.start(); 
     downThread.start(); 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException { 
     this.downstream.acceptMessage (message); 
     return this; 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (String message) throws InterruptedException { 
     return this.acceptMessage (new DownstreamMessage (this.getId(), message)); 
    } 

    /** 
    * Terminate the client connection 
    */ 
    private void terminate() { 
     try { 
      this.clientSocket.close(); 
     } catch (IOException e) { 
      Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
     } 
    } 

    /** 
    * Get this Session's ID 
    * 
    * @return The ID of this session 
    */ 
    public Integer getId() { 
     return this.sessionId; 
    } 

    /** 
    * Session constructor 
    * 
    * @param owner The Server object that owns this session 
    * @param sessionId The unique ID this session will be given 
    * @throws IOException 
    */ 
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException { 

     this.server   = owner; 
     this.clientSocket = clientSocket; 
     this.sessionId  = sessionId; 
     this.upstream  = new UpstreamThread (this); 
     this.downstream  = new DownstreamThread (this); 

     System.out.println ("Class " + this.getClass() + " created"); 
     System.out.println ("Session ID is " + this.sessionId); 
    } 
} 

Antwort

3

Statt Thread.stop Verwendung Thread.interrupt aufzurufen. Das wird die Methode take verursachen, eine InterruptedException zu werfen, die Sie verwenden können, um zu wissen, dass Sie herunterfahren sollten.

+1

ich daran dachte, aber zwingt nicht etwas eine Ausnahme als Teil der normalen Abschaltung ein bisschen eine Flickschusterei zu werfen? – GordonM

+0

Nein, so funktioniert es. Andernfalls müsste jede Methode, die unterbrochen werden könnte, einen speziellen unterbrochenen Wert zurückgeben. Weitere Informationen finden Sie unter http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html. – Brigham

+0

Ich weiß nicht viel über Java, aber was passiert, wenn Sie 'Thread.interrupt' aufrufen und wir' 'take' 'Methode nicht blockieren? Ich denke, Unterbrechung wird nicht ausgelöst, und wir müssen überprüfen "Thread.isInterrupted" ich denke. – Zuljin

0

Können Sie einfach "Fake" beenden Nachricht anstelle von outer.sessionEnding auf True, wenn "QUIT" erscheint. Wenn Sie diese gefälschte Nachricht in die Warteschlange stellen, wird der DownstreamThread aktiviert und Sie können ihn beenden. In diesem Fall können Sie diese sessionEnding-Variable sogar eliminieren.

In Pseudo-Code könnte dies wie folgt aussehen:

while (true) { 
    outputMessage = messageQueue.take(); // blocks 
    if (QUIT == outputMessage) 
     break 
    sendMessageToClient (outputMessage); 
} 
Verwandte Themen