Problembeschreibung gewaltsam geschlossen:Netty (netty-all-4.1.12.Final.jar) java.io.IOException: Eine bestehende Verbindung wurde von der Remote-Host
Mein Programm erstellt einen Netty Server und Client , dann macht es 2^17 Verbindungen zu diesem Server, irgendwann beginnt der Client diese Ausnahme zu empfangen:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
.
Der Englisch entspricht:
java.io.IOException: An existing connection was forcibly closed by the remote host
Offensichtlich ist es nicht erwünscht, dass Server zwang bestehende Verbindungen schließt.
Schritte zum Reproduzieren:
Für die Bequemlichkeit des jemand bereit, dieses Problem, das ich dieses „Single runnable Java-Datei“ Programm, das es wieder erstellt habe zu reproduzieren, ist es nur die netty-all-4.1.12.Final.jar
Abhängigkeit benötigt. Es startet netty Server auf einem freien Port, erstellt dann einen Client, führt Anforderungen aus, wartet ein bisschen, um dem Server die Möglichkeit zu geben, die Anforderungen zu verarbeiten, und druckt dann Statistiken darüber, wie viele Verbindungen hergestellt wurden, wie viele Verbindungen der Server verarbeitet hat und wie viele Verbindungen vorhanden waren verloren, wie viele und welche Art von Ausnahmen haben Server aufgetreten, und wie viele und welche Art von Ausnahmen hat Client angetroffen.
package netty.exception.tst;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyException {
public static void main(String[] args) throws InterruptedException {
System.out.println("starting server");
NettyServer server = new NettyServer(0);
int port = server.getPort();
System.out.println("server started at port: " + port);
System.out.println("staring client");
NettyClient client = new NettyClient();
System.out.println("client started");
int noOfConnectionsToPerform = 1 << 17;
System.out.println("performing " + noOfConnectionsToPerform + " connections");
for (int n = 0; n < noOfConnectionsToPerform; n++) {
// send a request
ChannelFuture f = client.getBootstrap().connect("localhost", port);
}
System.out.println("client performed " + noOfConnectionsToPerform + " connections");
System.out.println("wait a bit to give a chance for server to finish processing incoming requests");
Thread.currentThread().sleep(80000);
System.out.println("shutting down server and client");
server.stop();
client.stop();
System.out.println("stopped, server received: " + server.connectionsCount() + " connections");
int numberOfLostConnections = noOfConnectionsToPerform - server.connectionsCount();
if (numberOfLostConnections > 0) {
System.out.println("Where do we lost " + numberOfLostConnections + " connections?");
}
System.out.println("srerver exceptions: ");
printExceptions(server.getExceptions());
System.out.println("client exceptions: ");
printExceptions(client.getExceptions());
}
private static void printExceptions(Map<String, Integer> exceptions) {
if (exceptions.isEmpty()) {
System.out.println("There was no exceptions");
}
for (Entry<String, Integer> exception : exceptions.entrySet()) {
System.out.println("There was " + exception.getValue() + " times this exception:");
System.out.println(exception.getKey());
}
}
public static class NettyServer {
private ChannelFuture channelFuture;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private AtomicInteger connections = new AtomicInteger(0);
private ExceptionCounter exceptionCounter = new ExceptionCounter();
public NettyServer(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler() {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
connections.incrementAndGet();
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
exceptionCounter.countException(cause);
super.exceptionCaught(ctx, cause);
}
});
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
channelFuture = serverBootstrap.bind(port).sync();
}
public int getPort() {
return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
}
public int connectionsCount() {
return connections.get();
}
public Map<String, Integer> getExceptions() {
return exceptionCounter.getExceptions();
}
public void stop() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
try {
bossGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class NettyClient {
private Bootstrap bootstrap;
private EventLoopGroup workerGroup;
private ExceptionCounter exceptionCounter = new ExceptionCounter();
public NettyClient() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
exceptionCounter.countException(cause);
super.exceptionCaught(ctx, cause);
}
});
}
});
}
public Bootstrap getBootstrap() {
return bootstrap;
}
public void stop() {
workerGroup.shutdownGracefully();
try {
workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Map<String, Integer> getExceptions() {
return exceptionCounter.getExceptions();
}
}
public static class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis()/1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public static class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ThreadLocal<ByteBuf> buf = new ThreadLocal<ByteBuf>();
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf.set(ctx.alloc().buffer(4));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.get().release();
buf.remove();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.get().writeBytes(m);
m.release();
if (buf.get().readableBytes() >= 4) {
long currentTimeMillis = (buf.get().readUnsignedInt() - 2208988800L) * 1000L;
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public static class ExceptionCounter {
private ConcurrentHashMap<String, AtomicInteger> exceptions = new ConcurrentHashMap<String, AtomicInteger>();
private void countException(Throwable cause) {
StringWriter writer = new StringWriter();
cause.printStackTrace(new PrintWriter(writer));
String stackTrace = writer.toString();
AtomicInteger exceptionCount = exceptions.get(stackTrace);
if (exceptionCount == null) {
exceptionCount = new AtomicInteger(0);
AtomicInteger prevCount = exceptions.putIfAbsent(stackTrace, exceptionCount);
if (prevCount != null) {
exceptionCount = prevCount;
}
}
exceptionCount.incrementAndGet();
}
public Map<String, Integer> getExceptions() {
Map<String, Integer> newMap = exceptions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
return Collections.unmodifiableMap(newMap);
}
}
}
Die Ausgabe lautet:
starting server
server started at port: 56069
staring client
client started
performing 131072 connections
client performed 131072 connections
wait a bit to give a chance for server to finish processing incoming requests
shutting down server and client
stopped, server received: 34735 connections
Where do we lost 96337 connections?
srerver exceptions:
There was no exceptions
client exceptions:
There was 258 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:813)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
There was 30312 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
Fragen:
- Warum diese Ausnahme ausgelöst wird?
- Wo sind die verlorenen Verbindungen verschwunden? Warum gibt es keinen Fehler für sie?
- Wie zu vermeiden, was ist der richtige Weg, um diese Art von "Hochdurchsatz" -Anwendung zu programmieren, um solche Probleme nicht zu haben, wie bestehende Verbindungen zu verlieren/zu brechen?
- Kein Bezug zu Thema, aber vielleicht einige Netty Experte wird wissen: Warum, wenn ich in
TimeClientHandler
die Felddeklaration vonprivate ThreadLocal<ByteBuf> buf
ändern, um auch statisch zu sein, habe ich null Zeiger Ausnahme inTimeClientHandler.handlerRemoved
? Das ist sehr seltsam, ist diese Klasse irgendwie repliziert? oder sind die Threads vonNioEventLoopGroup
irgendwie komisch?
Umwelt:
- Netty Version: netty-all-4.1.12.Final.jar
- JVM Version: jdk1.8.0_111 64-Bit-
- Betriebssystemversion: Windows 10 64 Bit
Sicher, das könnte ein Teil der Geschichte sein, aber warum es bestehende Verbindungen beendet und warum es keinen Fehler gibt, das Limit zu überschreiten? Die Sache, die ich versuche zu tun, ist zu unterschreiten, was passiert, wenn wir die Grenzen überschreiten und lernen, wie man solche "High-Traffic" -Anwendungen programmiert. –