2017-01-09 3 views
2

Ich versuche, einen Proxy zu erstellen und dann alle anderen Traffic-Handler in Netty übergeben. Ich verstehe, dass ich die Verweise auf ByteBuf handhaben sollte, aber ich kann nicht verstehen, wie man es macht. Mein Beispiel und die Ausnahme ist unten.io.netty.util.IllegalReferenceCountException: refCnt: ​​0 in Netty

Initializer:

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> { 

    private final SslContext sslCtx; 

    private final String remoteHost; 

    private final int remotePort; 

    public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
     this.sslCtx = sslCtx; 
    } 

    public HexDumpProxyInitializer(String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
     this.sslCtx = null; 
    } 

    @Override 
    public void initChannel(SocketChannel ch) { 
     ChannelPipeline p = ch.pipeline(); 
     if (sslCtx != null) { 
      p.addLast(sslCtx.newHandler(ch.alloc())); 
     } 

     p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort)); 
     p.addLast(new InboundPrinterHandler()); 
    } 
} 

HexDumpProxyFrontendHandler

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter { 

    private final String remoteHost; 
    private final int remotePort; 

    private Channel outboundChannel; 

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
    } 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     final Channel inboundChannel = ctx.channel(); 

     // Start the connection attempt. 
     Bootstrap b = new Bootstrap(); 
     b.group(inboundChannel.eventLoop()) 
     .channel(ctx.channel().getClass()) 
     .handler(new HexDumpProxyBackendHandler(inboundChannel)) 
     .option(ChannelOption.AUTO_READ, false); 
     ChannelFuture f = b.connect(remoteHost, remotePort); 
     outboundChannel = f.channel(); 
     f.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) { 
       if (future.isSuccess()) { 
        // connection complete start to read first data 
        inboundChannel.read(); 
       } else { 
        // Close the connection if the connection attempt has failed. 
        inboundChannel.close(); 
       } 
      } 
     }); 
    } 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) { 
     if (outboundChannel.isActive()) { 
      outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture future) { 
        if (future.isSuccess()) { 
         // was able to flush out data, start to read the next chunk 
         ctx.channel().read(); 
        } else { 
         future.channel().close(); 
        } 
       } 
      }); 
     } 
     ctx.fireChannelRead(msg); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     if (outboundChannel != null) { 
      closeOnFlush(outboundChannel); 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     cause.printStackTrace(); 
     closeOnFlush(ctx.channel()); 
    } 

    static void closeOnFlush(Channel ch) { 
     if (ch.isActive()) { 
      ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 
     } 
    } 
} 

InboundPrinterHandler

public class InboundPrinterHandler extends ChannelInboundHandlerAdapter { 


    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     ByteBuf bb = null; 
     bb = (ByteBuf) msg; 
     System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset())); 
     System.out.println("\n\n\n"); 
    } 


    @Override 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 

    } 


} 

die Ausnahme

io.netty.util.IllegalReferenceCountException: refCnt: 0 
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407) 
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331) 
    at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614) 
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213) 
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208) 
    at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) 
    at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
    at java.lang.Thread.run(Thread.java:745) 

Antwort

5
if (outboundChannel.isActive()) { 
     outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
     // Snip 
    } 
    ctx.fireChannelRead(msg); 

Nachdem Sie den ByteBuf auf den anderen Kanal vergehen, ist es die anderen Kanäle Verantwortung wieder die refcount zu verringern. Da der andere Kanal nun den Refcount dekrementiert hat, ist er jetzt nicht mehr nutzbar.

Der beste Weg, dies zu lösen manuell den Wert erhöht wird, bevor Sie den Verkehr auf den anderen Kanal mit .retain() passieren:

outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() { 
// Your remaining code