深刻学习Netty(5)——Netty是如何解决TCP粘包/拆包问题的?

2021年09月15日 阅读数:4
这篇文章主要向大家介绍深刻学习Netty(5)——Netty是如何解决TCP粘包/拆包问题的?,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

前言

  学习Netty避免不了要去了解TCP粘包/拆包问题,熟悉各个编解码器是如何解决TCP粘包/拆包问题的,同时须要知道TCP粘包/拆包问题是怎么产生的。html

  在此博文前,能够先学习了解前几篇博文:git

  参考资料《Netty In Action》、《Netty权威指南》(有须要的小伙伴能够评论或者私信我)github

  博文中全部的代码都已上传到Github,欢迎Star、Fork算法

 


 

1、TCP粘包/拆包

1.什么是TCP粘包/拆包问题?

引用《Netty权威指南》原话,能够很清楚解释什么是TCP粘包/拆包问题。编程

  TCP是一个“流”协议,是没有界限的一串数据,TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际状况进行包的划分,因此在业务上认为,一个完整的包可能会被TCP拆成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。bootstrap

  一个完整的包可能会被TCP拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是TCP粘包/拆包。缓存

假设服务端分别发送两个数据包P1和P2给服务端,因为服务端读取一次的字节数目是不肯定的,因此可能会发生五种状况:异步

              

  • 服务端分两次读取到两个独立的数据包;
  • 服务端一次接收到两个数据包,P1和P2粘合在一块儿,被称为TCP粘包;
  • 服务端分两次读取到两个数据包,第一次读取到完整的P1包和P2包的部份内容,第二次读取到P2包的剩余内容,被称之为TCP拆包;
  • 服务端分两次读取到两个数据包,第一次读取到了P1包的部份内容P1_1,第二次读取到了P1包的剩余内容P1_2和P2包的整包
  • 其实还有最后一种可能,就是服务端TCP接收的滑动窗很是小,而数据包P1/P2很是大,颇有可能服务端须要分屡次才能将P1/P2包接收彻底,期间发生屡次拆包

2.TCP粘包/拆包问题发生的缘由

 TCP是以流动的方式传输数据,传输的最小单位为一个报文段(segment)。主要有以下几个指标影响或形成TCP粘包/拆包问题,分别为MSS、MTU、缓冲区,以及Nagle算法的影响。socket

(1)MSS(Maximum Segment Size)指的是链接层每次传输的数据有个最大限制MTU(Maximum Transmission Unit),超过这个量要分红多个报文段ide

(2)MTU限制了一次最多能够发送1500个字节,而TCP协议在发送DATA时,还会加上额外的TCP Header和IP Header,所以刨去这两个部分,就是TCP协议一次能够发送的实际应用数据的最大大小,即MSS长度=MTU长度-IP Header-TCP Header

(3)TCP为提升性能,发送端会将须要发送的数据发送到缓冲区,等待缓冲区满了以后,再将缓冲中的数据发送到接收方。同理,接收方也有缓冲区这样的机制,来接收数据。

因为有上述的缘由,因此会形成拆包/粘包的具体缘由以下:

(1)拆包发生缘由

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包
  • 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包

(2)粘包发生缘由

  • 要发送的数据小于TCP发送缓冲区的大小,TCP将屡次写入缓冲区的数据一次发送出去,将会发生粘包。
  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

2、TCP粘包/拆包问题解决策略

1.经常使用的解决策略

因为底层TCP是没法理解上层业务数据,因此在底层是没法保证数据包不被拆分和重组的,因此只能经过上层应用协议栈设计来解决

(1)消息定长,例如每一个报文的大小固定长度200字节,不够空位补空格

(2)在包尾增长回车换行符进行分割,例如FTP协议

(3)将消息分为消息头和消息体,消息头中包含表示消息总长度的字段

(4)更复杂的应用层协议

2.TCP粘包异常问题案例

(1)TimeServerHandler

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger log = Logger.getLogger(TimeClientHandler.class.getName());


    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
        // 每收到一条消息计数器就加1, 理论上应该接收到100条
        System.out.println("The time server receive order: " + body + "; the counter is : "+ (++counter));
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new Date(System.currentTimeMillis()).toString():"BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(3)TimeServer

public class TimeServer {

    public static final Logger log = LoggerFactory.getLogger(TimeServer.class);

    public static void main(String[] args) throws Exception {
        new TimeServer().bind();
    }


    public void bind() throws Exception {
        // NIO 线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(NettyConstant.REMOTE_IP, NettyConstant.REMOTE_PORT).sync();
            log.info("Time server[{}] start success", NettyConstant.REMOTE_IP + ": " + NettyConstant.REMOTE_PORT);
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

(3)TimeClientHandler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = Logger.getLogger(TimeClientHandler.class.getName());

    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
                .getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        // 循环发送100条消息,每发送一条刷新一次,服务端理论上接收到100条查询时间指令的请求
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        // 客户端每接收到服务端一条应答消息以后,计数器就加1,理论上应该有100条服务端日志
        System.out.println("Now is: " + body + "; the current is "+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(4)TimeClient

public class TimeClient {

    public static final Logger log = LoggerFactory.getLogger(TimeClient.class);



    public static void main(String[] args) throws Exception {
        new TimeClient().connect(NettyConstant.REMOTE_IP, NettyConstant.REMOTE_PORT);
    }

    public void connect(final String host, final int port) throws Exception {
        // NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            // 发起异步链接操做
            ChannelFuture f = bootstrap.connect(host, port).sync();
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            group.shutdownGracefully();
        }

    }

}

(5)运行测试结果

运行服务端与客户端,观察服务端与客户端

服务端:

The time server receive order: QUERY TIME ORDER
QUERY TIME ORDER
... // 此处忽略96个QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER; the counter is : 1

客户端:

Now is: BAD ORDER
; the current is 1

从结果上来看,客户端向服务端发送的100个“QUERY TIME ORDER”命令,都粘成一个包(counter=1),服务端也只返回一个命令“BAD ORDER”,能够尝试运行客户端屡次,每次运行的结果都是不同的,可是大部分都是粘包,计数器都小于了100。

 

3、Netty解决TCP粘包/拆包

1.按行文本解码器LineBasedFramedDecoder和StringDecoder

  LineBasedFramedDecoder:依次遍历ByeBuf中可读字节,判断是否有“\n”,“\r\n”,若是有,就当前位置为结束位置,从可读索引到结束位置区间的字节就组装成一行,以换行符为结束标志的解码器,同识支持最大长度。

  StringDecoder:将接收对象转换成字符串,而后继续调用后面的handler

  LineBasedFramedDecoder和StringDecoder就是按行切换的文本解码器,被设计用来支持TCP粘包与拆包

(1)改造TimeServer

  增长解码器LineBasedFramedDecoder和StringDecoder 

public class TimeServer {

    public static final Logger log = LoggerFactory.getLogger(TimeServer.class);

    public static void main(String[] args) throws Exception {
        new TimeServer().bind();
    }


    public void bind() throws Exception {
        // NIO 线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(NettyConstant.LOCAL_IP, NettyConstant.LOCAL_PORT).sync();
            log.info("Time server[{}] start success", NettyConstant.LOCAL_IP + ": " + NettyConstant.LOCAL_PORT);
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

(2)改造TimeServerHandler

  不须要对消息进行解码,直接String读取便可

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger log = Logger.getLogger(TimeClientHandler.class.getName());


    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 不须要对消息进行编解码,直接String读取
        String body = (String) msg;
        // 每收到一条消息计数器就加1, 理论上应该接收到100条
        System.out.println("The time server receive order: " + body + "; the counter is : "+ (++counter));
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new Date(System.currentTimeMillis()).toString():"BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(3)改造TimeClient

  一样增长解码器LineBasedFramedDecoder和StringDecoder

public class TimeClient {

    public static final Logger log = LoggerFactory.getLogger(TimeClient.class);



    public static void main(String[] args) throws Exception {
        new TimeClient().connect(NettyConstant.LOCAL_IP, NettyConstant.LOCAL_PORT);
    }

    public void connect(final String host, final int port) throws Exception {
        // NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            // 发起异步链接操做
            ChannelFuture f = bootstrap.connect(host, port).sync();
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            group.shutdownGracefully();
        }

    }

}

(4)改造TimeClientHandler

  一样地,不须要编解码了,直接返回了字符串的应答消息

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = Logger.getLogger(TimeClientHandler.class.getName());

    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
                .getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        // 循环发送100条消息,每发送一条刷新一次,服务端理论上接收到100条查询时间指令的请求
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 不须要编解码了,直接返回了字符串的应答消息
        String body = (String) msg;
        // 客户端每接收到服务端一条应答消息以后,计数器就加1,理论上应该有100条服务端日志
        System.out.println("Now is: " + body + "; the current is "+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(5)运行测试结果

服务端:

The time server receive order: QUERY TIME ORDER; the counter is : 1
The time server receive order: QUERY TIME ORDER; the counter is : 2
...
The time server receive order: QUERY TIME ORDER; the counter is : 99
The time server receive order: QUERY TIME ORDER; the counter is : 100

客户端:

Now is: Mon Jul 26 22:18:51 CST 2021; the current is 1
Now is: Mon Jul 26 22:18:51 CST 2021; the current is 2
...
Now is: Mon Jul 26 22:18:51 CST 2021; the current is 99
Now is: Mon Jul 26 22:18:51 CST 2021; the current is 100

根据结果可知,每条消息都对计数器加1,并无发生粘包现象。

2.按分隔符文本解码器DelimiterBasedFrameDecoder

  DelimiterBasedFrameDecoder是以分隔符做为码流结束标识的消息解码,改造代码,以“$_”做为分隔符

(1)改造TimeServer

  增长以“$_”为分隔符的DelimiterBasedFrameDecoder解码器,DelimiterBasedFrameDecoder构造器其中第一个参数长度表示当达到该长度后仍然没有查找到分隔符,就会抛出TooLongFrameException。这是防止异常码流缺失分隔符致使内存溢出

public class TimeServer {

    public static final Logger log = LoggerFactory.getLogger(TimeServer.class);

    public static void main(String[] args) throws Exception {
        new TimeServer().bind();
    }


    public void bind() throws Exception {
        // NIO 线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            // 以“$_”为分隔符
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(NettyConstant.LOCAL_IP, NettyConstant.LOCAL_PORT).sync();
            log.info("Time server[{}] start success", NettyConstant.LOCAL_IP + ": " + NettyConstant.LOCAL_PORT);
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

(2)改造TimeServerHandler

  对返回客户端的消息增长分隔符“$_”

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger log = Logger.getLogger(TimeClientHandler.class.getName());


    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 不须要对消息进行编解码,直接String读取
        String body = (String) msg;
        // 每收到一条消息计数器就加1, 理论上应该接收到100条
        System.out.println("The time server receive order: " + body + "; the counter is : "+ (++counter));
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new Date(System.currentTimeMillis()).toString():"BAD ORDER";
        // 返回客户端须要追加分隔符
        currentTime = currentTime + "$_";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(3)改造TimeClient

  增长以“$_”为分隔符的DelimiterBasedFrameDecoder解码器

public class TimeClient {

    public static final Logger log = LoggerFactory.getLogger(TimeClient.class);

    public static void main(String[] args) throws Exception {
        new TimeClient().connect(NettyConstant.LOCAL_IP, NettyConstant.LOCAL_PORT);
    }

    public void connect(final String host, final int port) throws Exception {
        // NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 以“$_”为分隔符
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            // 发起异步链接操做
            ChannelFuture f = bootstrap.connect(host, port).sync();
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            group.shutdownGracefully();
        }

    }

}

(4)改造TimeClientHandler

  对发送命令增长“$_”分隔符

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = Logger.getLogger(TimeClientHandler.class.getName());

    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        // 以$_为分隔符,发送命令
        req = ("QUERY TIME ORDER$_").getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        // 循环发送100条消息,每发送一条刷新一次,服务端理论上接收到100条查询时间指令的请求
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 不须要编解码了,直接返回了字符串的应答消息
        String body = (String) msg;
        // 客户端每接收到服务端一条应答消息以后,计数器就加1,理论上应该有100条服务端日志
        System.out.println("Now is: " + body + "; the current is "+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(5)运行测试结果

服务端:

The time server receive order: QUERY TIME ORDER; the counter is : 1
The time server receive order: QUERY TIME ORDER; the counter is : 2
...
The time server receive order: QUERY TIME ORDER; the counter is : 99
The time server receive order: QUERY TIME ORDER; the counter is : 100

客户端:

Now is: Mon Jul 26 22:18:51 CST 2021; the current is 1
Now is: Mon Jul 26 22:18:51 CST 2021; the current is 2
...
Now is: Mon Jul 26 22:18:51 CST 2021; the current is 99
Now is: Mon Jul 26 22:18:51 CST 2021; the current is 100

根据结果可知,每条消息都对计数器加1,并无发生粘包现象。

3.固定长度解码器FixedLengthFrameDecoder

  FixedLengthFrameDecoder是固定长度解码器,可以对固定长度的消息进行自动解码,利用FixedLengthFrameDecoder,不管多少数据,都会按照构造函数中设置的固定长度进行解码,若是是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下一个包到达后进行拼包,直到读取到一个完整的包

  在服务端ChannelPipeline中新增FixedLengthFrameDecoder,长度为10。而后增长EchoServerHannel处理器,输出服务端接收到的命令

(1)EchoServer

  增长长度为10的FixedLengthFrameDecoder解码器,同时再增长StringDecoder解码器

public class EchoServer {

    public static final Logger log = LoggerFactory.getLogger(EchoServer.class);

    public static void main(String[] args) throws Exception {
        new EchoServer().bind();
    }


    public void bind() throws Exception {
        // NIO 线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            // 增长固定长度解码器
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
                            // 增长字符解码器,将msg直接转为string
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(NettyConstant.LOCAL_IP, NettyConstant.LOCAL_PORT).sync();
            log.info("Time server[{}] start success", NettyConstant.LOCAL_IP + ": " + NettyConstant.LOCAL_PORT);
            // 等待全部服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

(2)EchoServerHandler

  输出客户端发送的命令,直接输出msg便可,由于服务端已经增长了StringDecoder解码器,直接转为String

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = Logger.getLogger(EchoServerHandler.class.getName());

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("The time server receive order: " + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warning("Unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

(3)Telnet命令测试结果

CMD窗口Telnet窗口链接 telnet 127.0.0.1 8888

回显输入消息welcome Lijian

查看服务端console

2021-07-26 23:25:21,921  INFO [nioEventLoopGroup-2-1] - [id: 0xe4d49ee6, L:/127.0.0.1:8888] READ: [id: 0x928b38a4, L:/127.0.0.1:8888 - R:/127.0.0.1:62315]
2021-07-26 23:25:21,922  INFO [nioEventLoopGroup-2-1] - [id: 0xe4d49ee6, L:/127.0.0.1:8888] READ COMPLETE
The time server receive order: welcome Li

根据结果可知,服务端只接收到客户端发送的“welcome Lijian”的前10个字符,及说明FixedLengthFrameDecoder是有效的

 

本篇博文是Netty的基础篇,主要介绍Netty针解决TCP粘包/拆包而产生的解码器,Netty基础篇还涉及到序列化的问题,后面将会继续介绍。