2017-02-15 3 views
0

Ich lerne Netty und Prototyping eine einfache App, die ein Objekt über TCP sendet. Mein Problem ist, dass, wenn ich Channel.write von der Serverseite mit meiner Nachricht anrufe, scheint es nicht die Handler in der Pipeline zu erreichen. Wenn ich eine Nachricht vom Client zum Server sende, funktioniert es wie erwartet.Netty Kanal schreiben nicht erreichen Handler

Hier ist der Code.

Server:

public class Main {  
    private int serverPort; 

    private EventLoopGroup bossGroup; 
    private EventLoopGroup workerGroup; 

    private ServerBootstrap boot; 
    private ChannelFuture future; 

    private SomeDataChannelDuplexHandler duplex; 

    private Channel ch; 

    public Main(int serverPort) { 
     this.serverPort = serverPort; 
    } 

    public void initialise() {  
     boot = new ServerBootstrap();  
     bossGroup = new NioEventLoopGroup(); 
     workerGroup = new NioEventLoopGroup(); 

     boot.group(bossGroup, workerGroup) 
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 2)); 

        // Inbound 
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0)); 
        ch.pipeline().addLast(new SomeDataDecoder()); 

        // Outbound 
        ch.pipeline().addLast(new LengthFieldPrepender(2)); 
        ch.pipeline().addLast(new SomeDataEncoder()); 

        // In-Out 
        ch.pipeline().addLast(new SomeDataChannelDuplexHandler()); 
       } 
      })  
      .option(ChannelOption.SO_BACKLOG, 128) 
      .childOption(ChannelOption.SO_KEEPALIVE, true); 
    } 

    public void sendMessage() { 
     SomeData fd = new SomeData("hello", "localhost", 1234);  
     ChannelFuture future = ch.writeAndFlush(fd);   
     future.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       if (!future.isSuccess()) { 
        System.out.println("send error: " + future.cause().toString()); 
       } else { 
        System.out.println("send message ok"); 
       } 
      } 
     }); 
    } 

    public void startServer(){ 
     try { 
      future = boot.bind(serverPort) 
        .sync() 
        .addListener(new ChannelFutureListener() { 
         @Override 
         public void operationComplete(ChannelFuture future) throws Exception { 
          ch = future.channel(); 
         } 
      }); 
     } catch (InterruptedException e) { 
      // log failure 
     } 
    } 

    public void stopServer() { 
     workerGroup.shutdownGracefully() 
      .addListener(e -> System.out.println("workerGroup shutdown")); 

     bossGroup.shutdownGracefully() 
      .addListener(e -> System.out.println("bossGroup shutdown")); 
    } 

    public static void main(String[] args) throws InterruptedException { 

     Main m = new Main(5000); 

     m.initialise(); 
     m.startServer(); 

     final Scanner scanner = new Scanner(System.in); 

     System.out.println("running."); 

     while (true) { 

      final String input = scanner.nextLine(); 

      if ("q".equals(input.trim())) { 
       break; 
      } else { 
       m.sendMessage(); 
      } 
     } 

     scanner.close(); 
     m.stopServer(); 
    } 
} 

Der Duplex-Kanal-Handler:

public class SomeDataChannelDuplexHandler extends ChannelDuplexHandler { 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     System.out.println("duplex channel active"); 
     ctx.fireChannelActive(); 
    } 

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     System.out.println("duplex channelRead"); 
     if (msg instanceof SomeData) { 
      SomeData sd = (SomeData) msg; 
      System.out.println("received: " + sd); 
     } else { 
      System.out.println("some other object"); 
     } 
     ctx.fireChannelRead(msg); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     cause.printStackTrace(); 
     ctx.close(); 
    } 

    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
     if (evt instanceof IdleStateEvent) { 
      IdleStateEvent event = (IdleStateEvent) evt; 
      if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write 
       System.out.println("idle: " + event.state()); 
      } 
     } 
    } 
} 

Und schließlich der Codierer (der Decodierer ist ähnlich):

public class SomeDataEncoder extends MessageToByteEncoder<SomeData> { 

    @Override 
    protected void encode(ChannelHandlerContext ctx, SomeData msg, ByteBuf out) throws Exception { 

     System.out.println("in encoder, msg = " + msg); 
     ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
     ObjectOutputStream oos = new ObjectOutputStream(bos); 

     oos.writeObject(msg.getName()); 
     oos.writeObject(msg.getIp()); 
     oos.writeInt(msg.getPort()); 
     oos.close(); 

     byte[] serialized = bos.toByteArray(); 
     int size = serialized.length; 

     ByteBuf encoded = ctx.alloc().buffer(size); 
     encoded.writeBytes(bos.toByteArray()); 

     out.writeBytes(encoded); 
    } 
} 

Die Client-Seite:

public class Client { 

    String host = "10.188.36.66"; 
    int port = 5000; 

    EventLoopGroup workerGroup = new NioEventLoopGroup(); 
    ChannelFuture f; 
    private Channel ch; 

    public Client() { 
    } 

    public void startClient() throws InterruptedException { 
     Bootstrap boot = new Bootstrap(); 
     boot.group(workerGroup); 
     boot.channel(NioSocketChannel.class); 
     boot.option(ChannelOption.SO_KEEPALIVE, true); 
     boot.handler(new ChannelInitializer<SocketChannel>() { 
      @Override 
      public void initChannel(SocketChannel ch) throws Exception {    
       // Inbound 
       ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0)); 
       ch.pipeline().addLast(new SomeDataDecoder()); 

       // Outbound 
       ch.pipeline().addLast(new LengthFieldPrepender(2)); 
       ch.pipeline().addLast(new SomeDataEncoder()); 

       // Handler 
       ch.pipeline().addLast(new SomeDataHandler()); 
      } 
     }); 

     // Start the client 
     f = boot.connect(host, port).sync(); 
     f.addListener(new ChannelFutureListener() { 
      public void operationComplete(ChannelFuture future) throws Exception { 
       System.out.println("connected to server"); 
       ch = f.channel(); 
      } 
     }); 
    } 

    public void stopClient() {  
     workerGroup.shutdownGracefully(); 
    } 

    private void writeMessage(String input) { 
     SomeData data = new SomeData("client", "localhost", 3333); 
     ChannelFuture fut = ch.writeAndFlush(data); 
     fut.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       System.out.println("send message"); 
      } 
     }); 
    } 

    public static void main(String[] args) throws InterruptedException { 
     Client client = new Client(); 
     client.startClient();   

     System.out.println("running.\n\n"); 
     final Scanner scanner = new Scanner(System.in); 

     while (true) { 

      final String input = scanner.nextLine(); 

      if ("q".equals(input.trim())) { 
       break; 
      } else { 
       client.writeMessage(input); 
      } 
     } 

     scanner.close(); 
     client.stopClient(); //call this at some point to shutdown the client 
    } 

} 

und der Handler:

public class SomeDataHandler extends SimpleChannelInboundHandler<SomeData> { 

    private ChannelHandlerContext ctx; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     System.out.println("connected"); 
     this.ctx = ctx; 
    } 

    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, SomeData msg) throws Exception { 
     System.out.println("got message: " + msg); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     System.out.println("caught exception: " + cause.getMessage()); 
     ctx.close(); 
    } 
} 

Wenn ich eine Nachricht über die Konsole auf dem Server senden, erhalte ich die Ausgabe:

running. 
duplex channel active 
duplex read 
idle: ALL_IDLE 
idle: ALL_IDLE 

send message ok 

So sieht es aus, als ob die Nachricht nicht gesendet wird, aber wird auf der Clientseite empfangen.

Wenn ich es von der Client-Seite bekomme ich (auf der Server-Konsole):

in decoder, numBytes in message = 31 
duplex channelRead 
received: SomeData [name=client, ip=localhost, port=3333] 

das ist, was ich erwarte.

Also, wo ist das Problem? Hat es etwas mit der Verwendung einer ChannelDuplexHandler auf der Serverseite und einer SimpleChannelInboundHandler auf der Client-Seite zu tun? Gibt es etwas, das ich anrufen muss, um die Nachricht in die Pipeline zu werfen?

UPDATE Ich habe einen Scheck für future.isSuccess() in dem Server nachrichts Verfahren hinzugefügt und ich send error: java.lang.UnsupportedOperationException auf der Konsole.

Antwort

1

(Veröffentlicht im Namen des OP).

Für jeden, der interessiert ist, war das Problem, dass ich versuche, die Nachricht auf dem Server-Kanal zu senden und nicht den normalen Kanal. This post wies mich in die richtige Richtung.

Verwandte Themen