2017-07-19 4 views
0

Ich muss eine JAVA Nio Server Anwendung in JBoss erstellen, um Daten von einer 10-200 Sensor Box zu lesen. Sie öffnen einen Stream und senden mir ständig Daten. Die Kommunikation ist bidirektional. Nun kann es manchmal vorkommen, dass diese Boxen (oder der Server) einen internen Fehler haben. Um diese Art von Problemen zu erkennen, prüft ein Observer-Thread alle 5 Sekunden, ob ein Datenblock seit der letzten Überprüfung eingetroffen ist. Wenn bis dahin keine meiner Boxen Daten gesendet hat, ist etwas Schlimmes passiert und ich möchte die gesamte Socket-Kommunikation neu starten.JAVA NIO Server: Wie alle Verbindungen zurückgesetzt werden

Jetzt ist es gut dokumentiert, wie man eine Socket-Verbindung mit NIO aufbauen kann, aber es ist schwieriger, komplexe Beispiele zu finden, wie man sie zurücksetzt. Und hier ist mein Problem: Wenn mein Watchdog erkennt, dass in den letzten 5 Sekunden keine Daten gekommen sind, ruft er close() und dann startEngine() auf. Aber danach kommen noch keine Daten an. Etwas scheint blockiert zu sein, einige Ressourcen sind noch assoziiert oder ähnlich. Wenn ich meinen JBoss neu starte, kommen die Daten erneut an. Kann mir jemand einen Hinweis geben?

ich danke Ihnen für Ihre Zeit! Stefan

public class TestServer 
{ 
    private NIOServer server; 
    private HashMap<String, SocketChannel> clientsList = new HashMap<String, SocketChannel>(); 

    class NIOServer extends Thread 
    { 
     class MessageBuffer 
     { 
       int [] msgAsByte = new int[msgSize]; 
       int pos = 0; 
       int lastSign = 0;          
       int bytesRead = 0; 
     } 
     private ByteBuffer readBuffer = ByteBuffer.allocate(256); 
     private Selector selector; 
     private boolean stop = false; 
     private int[] ports; 
     private int msgSize = 48; 
     private HashMap<String,MessageBuffer> buffer = new HashMap<String, MessageBuffer>(); 

     private List<ServerSocketChannel> channels; 
     // Maps a SocketChannel to a list of ByteBuffer instances 
     private Map<SocketChannel, List<ByteBuffer>> pendingDataToWrite = new HashMap<SocketChannel, List<ByteBuffer>>(); 

     public NIOServer(int[] ports) { 
       this.ports = ports; 
     } 

     private void stopAll() 
     { 
       stop = true; 

       try 
       { 
        server.interrupt(); 
        server.join(3000); 
       } 
       catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
       closeConnections(); 
     } 

     public void sendData(SocketChannel socket, byte[] data) 
     { 
       // And queue the data we want written 
       synchronized (this.pendingDataToWrite) { 
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingDataToWrite.get(socket); 
        if (queue == null) { 
          queue = new ArrayList<ByteBuffer>(); 
          this.pendingDataToWrite.put(socket, queue); 
        } 
        queue.add(ByteBuffer.wrap(data)); 
       } 

       SelectionKey key = socket.keyFor(this.selector); 
       if(key != null) 
        key.interestOps(SelectionKey.OP_WRITE); 
       // Finally, wake up our selecting thread so it can make the required changes 
       this.selector.wakeup(); 
     } 

     public void run() 
     { 
       try 
       { 
        stop = false; 
        selector = Selector.open(); 
        channels = new ArrayList<ServerSocketChannel>(); 
        ServerSocketChannel serverchannel; 
        for (int port : ports) 
        { 
          try 
          { 
           serverchannel = ServerSocketChannel.open(); 
           serverchannel.configureBlocking(false); 
           try 
           { 
             serverchannel.socket().setReuseAddress(true); 
           } 
           catch(SocketException se) 
           { 
             // 
           } 
           serverchannel.socket().bind(new InetSocketAddress(port)); 
           serverchannel.register(selector, SelectionKey.OP_ACCEPT); 
           channels.add(serverchannel); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
        } 
        while (!stop) 
        { 

          SelectionKey key = null; 
          try 
          { 
           selector.select(); 
           Iterator<SelectionKey> keysIterator = selector.selectedKeys() 
              .iterator(); 
           while (keysIterator.hasNext()) 
           { 
             key = keysIterator.next(); 

             if(key.isValid()) 
             { 
              if (key.isAcceptable()) 
              { 
                accept(key); 
              } 
              else if (key.isReadable()) 
              { 
                readData(key); 
              } 
              else if (key.isWritable()) 
              { 
                writeData(key); 
              } 
             } 
             else 
             { 
              SocketChannel sc = (SocketChannel) key.channel(); 
             } 
             keysIterator.remove(); 
           } 
          } 
          catch (Exception e) 
          { 
           if(e instanceof IOException || e instanceof ClosedSelectorException) 
           { 
             try 
             { 
              ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
              channels.remove(ssc); 
              ssc.close(); 
              key.cancel(); 
             } 
             catch(Exception ex) 
             { 
              // 
             } 

           } 
           else 
           { 
             // 
           } 
          } 
        } 
       } 
       catch(Exception e1) 
       { 
        // 
       } 

       closeConnections(); 

     } 

     private void closeConnections() 
     { 
       //if thread is stopped, close all 
       try 
       { 
        try 
        { 
          if(this.selector == null || this.selector.keys() == null) 
          { 
           log.debug("No selectors or keys found to close"); 
          } 
          else 
          { 
           Iterator<SelectionKey> keys = this.selector.keys().iterator(); 
           while(keys.hasNext()) 
           { 
             SelectionKey key = keys.next(); 
             key.cancel(); 
           } 
          } 
        } 
        catch(Exception ex) { 
          // 
        } 
        if(selector != null) 
          selector.close(); 
        if(channels != null) 
        { 
          for(ServerSocketChannel channel:channels) 
          { 
           channel.socket().close(); 
           channel.close(); 
          } 
        } 

        if(clientsList != null) 
        { 
          Iterator<Map.Entry<String, SocketChannel>> hfm = clientsList.entrySet().iterator(); 
          while(hfm.hasNext()) 
          { 
           Map.Entry<String, SocketChannel> s = hfm.next(); 
           s.getValue().close(); 
          } 
        } 
        clientsList=null; 

        selector = null; 
        channels = null; 
        pendingDataToWrite = null; 
       } 
       catch(Exception e) 
       { 
        // 
       } 

     } 

     private void accept(SelectionKey key) throws IOException 
     { 

       ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
       SocketChannel sc = ssc.accept(); 
       sc.configureBlocking(false); 
       sc.register(selector, SelectionKey.OP_READ); 

       String ip = sc.socket().getRemoteSocketAddress().toString(); 
       if(!buffer.containsKey(ip)) 
        buffer.put(ip, new MessageBuffer()); 
     } 

     private void readData(SelectionKey key) throws Exception 
     { 

       SocketChannel sc = (SocketChannel) key.channel();  

       MessageBuffer buf = buffer.get(sc.socket().getRemoteSocketAddress().toString()); 
       try 
       { 
        buf.bytesRead = sc.read(readBuffer); //read into buffer. 
       } 
       catch(Exception e2) 
       { 
        sc.close(); 
        buffer.remove(sc); 
       } 

       //close connection 
       if (buf.bytesRead == -1) 
       { 
        sc.close(); 
        key.cancel(); 
        return; 
       } 

       readBuffer.flip();  //make buffer ready for read 

       while(readBuffer.hasRemaining()) 
       { 
        //Read the data and forward it to another Process... 
       } 

       readBuffer.compact(); //make buffer ready for writing 

     } 

     private void writeData(SelectionKey key) throws Exception 
     { 
       SocketChannel socketChannel = (SocketChannel) key.channel(); 
       synchronized (this.pendingDataToWrite) { 
        List queue = (List) this.pendingDataToWrite.get(socketChannel); 

        // Write until there's not more data ... 
        while (!queue.isEmpty()) { 
          ByteBuffer buf = (ByteBuffer) queue.get(0); 
          try 
          { 
           socketChannel.write(buf); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
          finally 
          { 
           queue.remove(0); 
          } 
          if (buf.remaining() > 0) { 
           // ... or the socket's buffer fills up 
           break; 
          } 
        } 

        key.interestOps(SelectionKey.OP_READ); 
       } 
     } 
    } 



    public void close() { 

     if (server != null && server.isAlive()) 
     {  
        server.stopAll(); 
     } 
     if(clientsList != null) 
     { 
       clientsList.clear(); 
     } 
     server = null; 

    } 

    public void startEngine(int[] ports) { 
     if (ports != null) { 
       for (int port : ports) 
        log.info("Listening on port " + port); 
       server= new NIOServer(ports); 
       server.start(); 
     } 
    } 

} 

Antwort

1

Verwenden eines select() Timeout.

Wenn die Zeitüberschreitung eintritt, schließen Sie alle registrierten SocketChannels.

Wenn Sie detailliertere Informationen erhalten möchten, behalten Sie die letzte E/A-Zeit auf jedem Kanal im Auge und schließen Sie diejenigen, die am Ende jeder select()-Schleife abgelaufen sind.

NB Ihre Technik für OP_WRITE ist nicht korrekt. Es gibt viele Antworten hier, die zeigen, wie man es richtig benutzt.

+0

Vielen Dank für Ihre Eingabe, ich nehme an, dass Sie Ihre Meinung zu Themen wie diese https://stackoverflow.com/questions/17556901/java-high-load-nio-tcp-server bedeuten? Das heißt, dass ich schreiben sollte, wenn ich schreiben möchte und nur wenn diese Operation Null zurückgibt, das OP_WRITE registrieren. Richtig? Die select + timeout hilft mir zu erkennen, wenn keine neuen Daten ankommen, aber es hat mein Problem nicht behoben, dass nach dem Aufruf von close() und restartEngine() ich noch keine neuen Daten bekomme ... – user3354754

+0

Sie nicht Holen Sie alles, bis die Clients wieder verbinden. – EJP

+0

gut, wenn ich meinen jboss neu starte sie verbinden und alles funktioniert gut. Aber wenn ich nur die Steckdosen schließe, tue es nicht. Gibt es eine Möglichkeit, sie zu zwingen, wieder zu verbinden – user3354754