2017-07-06 4 views
0

Problembeschreibung gewaltsam geschlossen:Netty (netty-all-4.1.12.Final.jar) java.io.IOException: Eine bestehende Verbindung wurde von der Remote-Host

Mein Programm erstellt einen Netty Server und Client , dann macht es 2^17 Verbindungen zu diesem Server, irgendwann beginnt der Client diese Ausnahme zu empfangen:

java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta.

Der Englisch entspricht:

java.io.IOException: An existing connection was forcibly closed by the remote host

Offensichtlich ist es nicht erwünscht, dass Server zwang bestehende Verbindungen schließt.

Schritte zum Reproduzieren:

Für die Bequemlichkeit des jemand bereit, dieses Problem, das ich dieses „Single runnable Java-Datei“ Programm, das es wieder erstellt habe zu reproduzieren, ist es nur die netty-all-4.1.12.Final.jar Abhängigkeit benötigt. Es startet netty Server auf einem freien Port, erstellt dann einen Client, führt Anforderungen aus, wartet ein bisschen, um dem Server die Möglichkeit zu geben, die Anforderungen zu verarbeiten, und druckt dann Statistiken darüber, wie viele Verbindungen hergestellt wurden, wie viele Verbindungen der Server verarbeitet hat und wie viele Verbindungen vorhanden waren verloren, wie viele und welche Art von Ausnahmen haben Server aufgetreten, und wie viele und welche Art von Ausnahmen hat Client angetroffen.

package netty.exception.tst; 

import java.io.PrintWriter; 
import java.io.StringWriter; 
import java.net.InetSocketAddress; 
import java.util.Collections; 
import java.util.Map; 
import java.util.Map.Entry; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.Collectors; 

import io.netty.bootstrap.Bootstrap; 
import io.netty.bootstrap.ServerBootstrap; 
import io.netty.buffer.ByteBuf; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 

public class NettyException { 

    public static void main(String[] args) throws InterruptedException { 
     System.out.println("starting server"); 
     NettyServer server = new NettyServer(0); 
     int port = server.getPort(); 
     System.out.println("server started at port: " + port); 

     System.out.println("staring client"); 
     NettyClient client = new NettyClient(); 
     System.out.println("client started"); 

     int noOfConnectionsToPerform = 1 << 17; 
     System.out.println("performing " + noOfConnectionsToPerform + " connections"); 
     for (int n = 0; n < noOfConnectionsToPerform; n++) { 
      // send a request 
      ChannelFuture f = client.getBootstrap().connect("localhost", port); 
     } 
     System.out.println("client performed " + noOfConnectionsToPerform + " connections"); 

     System.out.println("wait a bit to give a chance for server to finish processing incoming requests"); 
     Thread.currentThread().sleep(80000); 

     System.out.println("shutting down server and client"); 
     server.stop(); 
     client.stop(); 

     System.out.println("stopped, server received: " + server.connectionsCount() + " connections"); 
     int numberOfLostConnections = noOfConnectionsToPerform - server.connectionsCount(); 
     if (numberOfLostConnections > 0) { 
      System.out.println("Where do we lost " + numberOfLostConnections + " connections?"); 
     } 

     System.out.println("srerver exceptions: "); 
     printExceptions(server.getExceptions()); 
     System.out.println("client exceptions: "); 
     printExceptions(client.getExceptions()); 
    } 

    private static void printExceptions(Map<String, Integer> exceptions) { 
     if (exceptions.isEmpty()) { 
      System.out.println("There was no exceptions"); 
     } 
     for (Entry<String, Integer> exception : exceptions.entrySet()) { 
      System.out.println("There was " + exception.getValue() + " times this exception:"); 
      System.out.println(exception.getKey()); 
     } 
    } 

    public static class NettyServer { 
     private ChannelFuture channelFuture; 
     private EventLoopGroup bossGroup; 
     private EventLoopGroup workerGroup; 
     private AtomicInteger connections = new AtomicInteger(0); 
     private ExceptionCounter exceptionCounter = new ExceptionCounter(); 

     public NettyServer(int port) throws InterruptedException { 
      bossGroup = new NioEventLoopGroup(); 
      workerGroup = new NioEventLoopGroup(); 
      ServerBootstrap serverBootstrap = new ServerBootstrap(); 
      serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 
        .childHandler(new ChannelInitializer<SocketChannel>() { 
         @Override 
         public void initChannel(SocketChannel ch) throws Exception { 
          ch.pipeline().addLast(new TimeServerHandler() { 

           @Override 
           public void channelActive(final ChannelHandlerContext ctx) { 
            connections.incrementAndGet(); 
            super.channelActive(ctx); 
           } 

           @Override 
           public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
            exceptionCounter.countException(cause); 
            super.exceptionCaught(ctx, cause); 
           } 

          }); 
         } 
        }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); 
      channelFuture = serverBootstrap.bind(port).sync(); 
     } 

     public int getPort() { 
      return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); 
     } 

     public int connectionsCount() { 
      return connections.get(); 
     } 

     public Map<String, Integer> getExceptions() { 
      return exceptionCounter.getExceptions(); 
     } 

     public void stop() { 
      bossGroup.shutdownGracefully(); 
      workerGroup.shutdownGracefully(); 
      try { 
       bossGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
       workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public static class NettyClient { 
     private Bootstrap bootstrap; 
     private EventLoopGroup workerGroup; 
     private ExceptionCounter exceptionCounter = new ExceptionCounter(); 

     public NettyClient() { 
      workerGroup = new NioEventLoopGroup(); 

      bootstrap = new Bootstrap(); 
      bootstrap.group(workerGroup); 
      bootstrap.channel(NioSocketChannel.class); 
      bootstrap.option(ChannelOption.SO_KEEPALIVE, true); 
      bootstrap.handler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast(new TimeClientHandler() { 
         @Override 
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
          exceptionCounter.countException(cause); 
          super.exceptionCaught(ctx, cause); 
         } 
        }); 
       } 
      }); 
     } 

     public Bootstrap getBootstrap() { 
      return bootstrap; 
     } 

     public void stop() { 
      workerGroup.shutdownGracefully(); 
      try { 
       workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 

     public Map<String, Integer> getExceptions() { 
      return exceptionCounter.getExceptions(); 
     } 
    } 

    public static class TimeServerHandler extends ChannelInboundHandlerAdapter { 

     @Override 
     public void channelActive(final ChannelHandlerContext ctx) { 
      final ByteBuf time = ctx.alloc().buffer(4); 
      time.writeInt((int) (System.currentTimeMillis()/1000L + 2208988800L)); 

      final ChannelFuture f = ctx.writeAndFlush(time); 
      f.addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture future) { 
        assert f == future; 
        ctx.close(); 
       } 
      }); 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
      ctx.close(); 
     } 
    } 

    public static class TimeClientHandler extends ChannelInboundHandlerAdapter { 
     private ThreadLocal<ByteBuf> buf = new ThreadLocal<ByteBuf>(); 

     @Override 
     public void handlerAdded(ChannelHandlerContext ctx) { 
      buf.set(ctx.alloc().buffer(4)); 
     } 

     @Override 
     public void handlerRemoved(ChannelHandlerContext ctx) { 
      buf.get().release(); 
      buf.remove(); 
     } 

     @Override 
     public void channelRead(ChannelHandlerContext ctx, Object msg) { 
      ByteBuf m = (ByteBuf) msg; 
      buf.get().writeBytes(m); 
      m.release(); 

      if (buf.get().readableBytes() >= 4) { 
       long currentTimeMillis = (buf.get().readUnsignedInt() - 2208988800L) * 1000L; 
       ctx.close(); 
      } 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
      ctx.close(); 
     } 
    } 

    public static class ExceptionCounter { 
     private ConcurrentHashMap<String, AtomicInteger> exceptions = new ConcurrentHashMap<String, AtomicInteger>(); 

     private void countException(Throwable cause) { 

      StringWriter writer = new StringWriter(); 
      cause.printStackTrace(new PrintWriter(writer)); 
      String stackTrace = writer.toString(); 

      AtomicInteger exceptionCount = exceptions.get(stackTrace); 
      if (exceptionCount == null) { 
       exceptionCount = new AtomicInteger(0); 
       AtomicInteger prevCount = exceptions.putIfAbsent(stackTrace, exceptionCount); 
       if (prevCount != null) { 
        exceptionCount = prevCount; 
       } 
      } 
      exceptionCount.incrementAndGet(); 
     } 

     public Map<String, Integer> getExceptions() { 
      Map<String, Integer> newMap = exceptions.entrySet().stream() 
        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); 
      return Collections.unmodifiableMap(newMap); 
     } 
    } 
} 

Die Ausgabe lautet:

starting server 
server started at port: 56069 
staring client 
client started 
performing 131072 connections 
client performed 131072 connections 
wait a bit to give a chance for server to finish processing incoming requests 
shutting down server and client 
stopped, server received: 34735 connections 
Where do we lost 96337 connections? 
srerver exceptions: 
There was no exceptions 
client exceptions: 
There was 258 times this exception: 
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) 
    at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:813) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 
    at java.lang.Thread.run(Thread.java:745) 

There was 30312 times this exception: 
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 
    at java.lang.Thread.run(Thread.java:745) 

Fragen:

  • Warum diese Ausnahme ausgelöst wird?
  • Wo sind die verlorenen Verbindungen verschwunden? Warum gibt es keinen Fehler für sie?
  • Wie zu vermeiden, was ist der richtige Weg, um diese Art von "Hochdurchsatz" -Anwendung zu programmieren, um solche Probleme nicht zu haben, wie bestehende Verbindungen zu verlieren/zu brechen?
  • Kein Bezug zu Thema, aber vielleicht einige Netty Experte wird wissen: Warum, wenn ich in TimeClientHandler die Felddeklaration von private ThreadLocal<ByteBuf> buf ändern, um auch statisch zu sein, habe ich null Zeiger Ausnahme in TimeClientHandler.handlerRemoved? Das ist sehr seltsam, ist diese Klasse irgendwie repliziert? oder sind die Threads von NioEventLoopGroup irgendwie komisch?

Umwelt:

  • Netty Version: netty-all-4.1.12.Final.jar
  • JVM Version: jdk1.8.0_111 64-Bit-
  • Betriebssystemversion: Windows 10 64 Bit

Antwort

1

Es gibt eine Grenze von 64k Ports pro IP-Adresse, so dass Sie 2^17 Ports nicht öffnen können.Da jeder Socket ein Datei-Handle verwendet, können Sie das Limit von maximal geöffneten Dateien pro Prozess erreichen. Siehe "Max open files" for working process.

+0

Sicher, das könnte ein Teil der Geschichte sein, aber warum es bestehende Verbindungen beendet und warum es keinen Fehler gibt, das Limit zu überschreiten? Die Sache, die ich versuche zu tun, ist zu unterschreiten, was passiert, wenn wir die Grenzen überschreiten und lernen, wie man solche "High-Traffic" -Anwendungen programmiert. –

Verwandte Themen