netty

2019年12月06日 阅读数:67
这篇文章主要向大家介绍netty,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

NIO网络编程:html

开发出高质量的NIO程序并非一件简单的事情,除去NIO固有的复杂性和BUG不谈,做为一个NIO服务端须要可以处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等等,若是你没有足够的NIO编程经验积累,一个NIO框架的稳定每每须要半年甚至更长的时间。更为糟糕的是一旦在生产环境中发生问题,每每会致使跨节点的服务调用中断,严重的可能会致使整个集群环境都不可用,须要重启服务器,这种非正常停机会带来巨大的损失。java

从可维护性角度看,因为NIO采用了异步非阻塞编程模型,并且是一个IO线程处理多条链路,它的调试和跟踪很是麻烦,特别是生产环境中的问题,咱们没法有效调试和跟踪,每每只能靠一些日志来辅助分析,定位难度很大。编程

JDK原生NIO的问题bootstrap

1)      NIO的类库和API繁杂,使用麻烦,你须要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;api

2)      须要具有其它的额外技能作铺垫,例如熟悉Java多线程编程,由于NIO编程涉及到Reactor模式,你必须对多线程和网路编程很是熟悉,才能编写出高质量的NIO程序;promise

3)      可靠性能力补齐,工做量和难度都很是大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特色是功能开发相对容易,可是可靠性能力补齐工做量和难度都很是大;缓存

4)      JDK NIO的BUG,例如臭名昭著的epoll bug,它会致使Selector空轮询,最终致使CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,可是直到JDK1.7版本该问题仍旧存在,只不过该bug发生几率下降了一些而已,它并无被根本解决。该BUG以及与该BUG相关的问题单以下:安全

http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933服务器

http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719网络

为何要选择netty:

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是数一数二的,它已经获得成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty做为底层通讯框架。不少其它业界主流的RPC框架,也使用Netty来构建高性能的异步通讯能力。

Netty是一个NIO框架,使用它能够简单快速地开发网络应用程序,好比客户端和服务端的协议。Netty大大简化了网络程序的开发过程好比TCP和UDP的 Socket的开发。

“快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty是一个精心设计的框架,它从许多协议的实现中吸取了不少的经验好比FTP、SMTP、HTTP、许多二进制和基于文本的传统协议,Netty在不下降开发效率、性能、稳定性、灵活性状况下,成功地找到了解决方案。

有一些用户可能已经发现其余的一些网络框架也声称本身有一样的优点,因此你可能会问是Netty和它们的不一样之处。答案就是Netty的哲学设计理念。Netty从第一天开始就为用户提供了用户体验最好的API以及实现设计。正是由于Netty的设计理念,才让咱们得以轻松地阅读本指南并使用Netty。

经过对Netty的分析,咱们将它的优势总结以下:

1)      API使用简单,开发门槛低;

2)      功能强大,预置了多种编解码功能,支持多种主流协议;

3)      定制能力强,能够经过ChannelHandler对通讯框架进行灵活的扩展;

4)      性能高,经过与其它业界主流的NIO框架对比,Netty的综合性能最优;

5)      成熟、稳定,Netty修复了已经发现的全部JDK NIO BUG,业务开发人员不须要再为NIO的BUG而烦恼;

6)      社区活跃,版本迭代周期短,发现的BUG能够被及时修复,同时,更多的新功能会被加入;

7)      经历了大规模的商业应用考验,质量已经获得验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业获得成功商用,证实了它能够彻底知足不一样行业的商业应用。

正是由于这些优势,Netty逐渐成为Java NIO编程的首选框架。

开始以前

运行本章节中的两个例子最低要求是:Netty的最新版本(Netty5)和JDK1.6及以上。最新的Netty版本在项目下载页面能够找到。为了下载到正确的JDK版本,请到你喜欢的网站下载。

阅读本章节过程当中,你可能会对相关类有疑惑,关于这些类的详细的信息请请参考API说明文档。为了方便,全部文档中涉及到的类名字都会被关联到一个在线的API说明。固然若是有任何错误信息、语法错误或者你有任何好的建议来改进文档说明,那么请联系Netty社区

DISCARD服务(丢弃服务,指的是会忽略全部接收的数据的一种协议)

世界上最简单的协议不是”Hello,World!”,是DISCARD,他是一种丢弃了全部接受到的数据,并不作有任何的响应的协议。

为了实现DISCARD协议,你惟一须要作的就是忽略全部收到的数据。让咱们从处理器的实现开始,处理器是由Netty生成用来处理I/O事件的。

01 package io.netty.example.discard;
02  
03 import io.netty.buffer.ByteBuf;
04  
05 import io.netty.channel.ChannelHandlerContext;
06 import io.netty.channel.ChannelHandlerAdapter;
07  
08 /**
09  * Handles a server-side channel.
10  */
11 public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)
12  
13     @Override
14     public void channelRead(ChannelHandlerContext ctx, Object msg) {// (2)
15         // Discard the received data silently.
16         ((ByteBuf) msg).release(); // (3)
17     }
18  
19     @Override
20     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
21         // Close the connection when an exception is raised.
22         cause.printStackTrace();
23         ctx.close();
24     }
25 }
  1. DisCardServerHandler 继承自 ChannelHandlerAdapter,这个类实现了ChannelHandler接口,ChannelHandler提供了许多事件处理的接口方法,而后你能够覆盖这些方法。如今仅仅只须要继承ChannelHandlerAdapter类而不是你本身去实现接口方法。
  2. 这里咱们覆盖了chanelRead()事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,这个例子中,收到的消息的类型是ByteBuf
  3. 为了实现DISCARD协议,处理器不得不忽略全部接受到的消息。ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。请记住处理器的职责是释放全部传递处处理器的引用计数对象。一般,channelRead()方法的实现就像下面的这段代码:
    1 @Override
    2 public void channelRead(ChannelHandlerContext ctx, Object msg) {
    3     try {
    4         // Do something with msg
    5     finally {
    6         ReferenceCountUtil.release(msg);
    7     }
    8 }
  4. exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,即当Netty因为IO错误或者处理器在处理事件时抛出的异常时。在大部分状况下,捕获的异常应该被记录下来而且把关联的channel给关闭掉。然而这个方法的处理方式会在遇到不一样异常的状况下有不一样的实现,好比你可能想在关闭链接以前发送一个错误码的响应消息。

到目前为止一切都还比较顺利,咱们已经实现了DISCARD服务的一半功能,剩下的须要编写一个main()方法来启动服务端的DiscardServerHandler。

01 package io.netty.example.discard;
02  
03 import io.netty.bootstrap.ServerBootstrap;
04  
05 import io.netty.channel.ChannelFuture;
06 import io.netty.channel.ChannelInitializer;
07 import io.netty.channel.ChannelOption;
08 import io.netty.channel.EventLoopGroup;
09 import io.netty.channel.nio.NioEventLoopGroup;
10 import io.netty.channel.socket.SocketChannel;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12  
13 /**
14  * Discards any incoming data.
15  */
16 public class DiscardServer {
17  
18     private int port;
19  
20     public DiscardServer(int port) {
21         this.port = port;
22     }
23  
24     public void run() throws Exception {
25         EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
26         EventLoopGroup workerGroup = new NioEventLoopGroup();
27         try {
28             ServerBootstrap b = new ServerBootstrap(); // (2)
29             b.group(bossGroup, workerGroup)
30              .channel(NioServerSocketChannel.class// (3)
31              .childHandler(newChannelInitializer<SocketChannel>() { // (4)
32                  @Override
33                  public void initChannel(SocketChannel ch) throwsException {
34                      ch.pipeline().addLast(newDiscardServerHandler());
35                  }
36              })
37              .option(ChannelOption.SO_BACKLOG, 128)          // (5)
38              .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
39  
40             // Bind and start to accept incoming connections.
41             ChannelFuture f = b.bind(port).sync(); // (7)
42  
43             // Wait until the server socket is closed.
44             // In this example, this does not happen, but you can do that to gracefully
45             // shut down your server.
46             f.channel().closeFuture().sync();
47         finally {
48             workerGroup.shutdownGracefully();
49             bossGroup.shutdownGracefully();
50         }
51     }
52  
53     public static void main(String[] args) throws Exception {
54         int port;
55         if (args.length > 0) {
56             port = Integer.parseInt(args[0]);
57         else {
58             port = 8080;
59         }
60         new DiscardServer(port).run();
61     }
62 }
  1. NioEventLoopGroup 是用来处理I/O操做的多线程事件循环器,Netty提供了许多不一样的EventLoopGroup的实现用来处理不一样传输协议。在这个例子中咱们实现了一个服务端的应用,所以会有2个NioEventLoopGroup会被使用。第一个常常被叫作‘boss’,用来接收进来的链接。第二个常常被叫作‘worker’,用来处理已经被接收的链接,一旦‘boss’接收到链接,就会把链接信息注册到‘worker’上。如何知道多少个线程已经被使用,如何映射到已经建立的Channels上都须要依赖于EventLoopGroup的实现,而且能够经过构造函数来配置他们的关系。
  2. ServerBootstrap 是一个启动NIO服务的辅助启动类。你能够在这个服务中直接使用Channel,可是这会是一个复杂的处理过程,在不少状况下你并不须要这样作。
  3. 这里咱们指定使用NioServerSocketChannel类来举例说明一个新的Channel如何接收进来的链接。
  4. 这里的事件处理类常常会被用来处理一个最近的已经接收的ChannelChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel。也许你想经过增长一些处理类好比DiscardServerHandle来配置一个新的Channel或者其对应的ChannelPipeline来实现你的网络程序。当你的程序变的复杂时,可能你会增长更多的处理类到pipline上,而后提取这些匿名类到最顶层的类上。
  5. 你能够设置这里指定的通道实现的配置参数。咱们正在写一个TCP/IP的服务端,所以咱们被容许设置socket的参数选项好比tcpNoDelay和keepAlive。请参考ChannelOption和详细的ChannelConfig实现的接口文档以此能够对ChannelOptions的有一个大概的认识。
  6. 你关注过option()和childOption()吗?option()是提供给NioServerSocketChannel用来接收进来的链接。childOption()是提供给由父管道ServerChannel接收到的链接,在这个例子中也是NioServerSocketChannel
  7. 咱们继续,剩下的就是绑定端口而后启动服务。这里咱们在机器上绑定了机器全部网卡上的8080端口。固然如今你能够屡次调用bind()方法(基于不一样绑定地址)。

恭喜!你已经完成熟练地完成了第一个基于Netty的服务端程序。

观察接收到的数据

如今咱们已经编写出咱们第一个服务端,咱们须要测试一下他是否真的能够运行。最简单的测试方法是用telnet 命令。例如,你能够在命令行上输入telnet localhost 8080或者其余类型参数。

然而咱们能说这个服务端是正常运行了吗?事实上咱们也不知道由于他是一个discard服务,你根本不可能获得任何的响应。为了证实他仍然是在工做的,让咱们修改服务端的程序来打印出他到底接收到了什么。

咱们已经知道channelRead()方法是在数据被接收的时候调用。让咱们放一些代码到DiscardServerHandler类的channelRead()方法。

01 @Override
02 public void channelRead(ChannelHandlerContext ctx, Object msg) {
03     ByteBuf in = (ByteBuf) msg;
04     try {
05         while (in.isReadable()) { // (1)
06             System.out.print((char) in.readByte());
07             System.out.flush();
08         }
09     finally {
10         ReferenceCountUtil.release(msg); // (2)
11     }
12 }
  1. 这个低效的循环事实上能够简化为:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 或者,你能够在这里调用in.release()。

若是你再次运行telnet命令,你将会看到服务端打印出了他所接收到的消息。
完整的discard server代码放在了io.netty.example.discard包下面。

ECHO服务(响应式协议)

到目前为止,咱们虽然接收到了数据,但没有作任何的响应。然而一个服务端一般会对一个请求做出响应。让咱们学习怎样在ECHO协议的实现下编写一个响应消息给客户端,这个协议针对任何接收的数据都会返回一个响应。

和discard server惟一不一样的是把在此以前咱们实现的channelRead()方法,返回全部的数据替代打印接收数据到控制台上的逻辑。所以,须要把channelRead()方法修改以下:

1 @Override
2 public void channelRead(ChannelHandlerContext ctx, Object msg) {
3     ctx.write(msg); // (1)
4     ctx.flush(); // (2)
5 }

1. ChannelHandlerContext对象提供了许多操做,使你可以触发各类各样的I/O事件和操做。这里咱们调用了write(Object)方法来逐字地把接受到的消息写入。请注意不一样于DISCARD的例子咱们并无释放接受到的消息,这是由于当写入的时候Netty已经帮咱们释放了。
2. ctx.write(Object)方法不会使消息写入到通道上,他被缓冲在了内部,你须要调用ctx.flush()方法来把缓冲区中数据强行输出。或者你能够用更简洁的cxt.writeAndFlush(msg)以达到一样的目的。

若是你再一次运行telnet命令,你会看到服务端会发回一个你已经发送的消息。
完整的echo服务的代码放在了io.netty.example.echo包下面。

TIME服务(时间协议的服务)

在这个部分被实现的协议是TIME协议。和以前的例子不一样的是在不接受任何请求时他会发送一个含32位的整数的消息,而且一旦消息发送就会当即关闭链接。在这个例子中,你会学习到如何构建和发送一个消息,而后在完成时主动关闭链接。

由于咱们将会忽略任何接收到的数据,而只是在链接被建立发送一个消息,因此此次咱们不能使用channelRead()方法了,代替他的是,咱们须要覆盖channelActive()方法,下面的就是实现的内容:

01 package io.netty.example.time;
02  
03 public class TimeServerHandler extends ChannelHandlerAdapter {
04  
05     @Override
06     public void channelActive(final ChannelHandlerContext ctx) { // (1)
07         final ByteBuf time = ctx.alloc().buffer(4); // (2)
08         time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
09  
10         final ChannelFuture f = ctx.writeAndFlush(time); // (3)
11         f.addListener(new ChannelFutureListener() {
12             @Override
13             public void operationComplete(ChannelFuture future) {
14                 assert f == future;
15                 ctx.close();
16             }
17         }); // (4)
18     }
19  
20     @Override
21     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
22         cause.printStackTrace();
23         ctx.close();
24     }
25 }
  1. channelActive()方法将会在链接被创建而且准备进行通讯时被调用。所以让咱们在这个方法里完成一个表明当前时间的32位整数消息的构建工做。
  2. 为了发送一个新的消息,咱们须要分配一个包含这个消息的新的缓冲。由于咱们须要写入一个32位的整数,所以咱们须要一个至少有4个字节的ByteBuf。经过ChannelHandlerContext.alloc()获得一个当前的ByteBufAllocator,而后分配一个新的缓冲。
  3. 和往常同样咱们须要编写一个构建好的消息。可是等一等,flip在哪?难道咱们使用NIO发送消息时不是调用java.nio.ByteBuffer.flip()吗?ByteBuf之因此没有这个方法由于有两个指针,一个对应读操做一个对应写操做。当你向ByteBuf里写入数据的时候写指针的索引就会增长,同时读指针的索引没有变化。读指针索引和写指针索引分别表明了消息的开始和结束。比较起来,NIO缓冲并无提供一种简洁的方式来计算出消息内容的开始和结尾,除非你调用flip方法。当你忘记调用flip方法而引发没有数据或者错误数据被发送时,你会陷入困境。这样的一个错误不会发生在Netty上,由于咱们对于不一样的操做类型有不一样的指针。你会发现这样的使用方法会让你过程变得更加的容易,由于你已经习惯一种没有使用flip的方式。另一个点须要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法会返回一个ChannelFuture对象,一个ChannelFuture表明了一个尚未发生的I/O操做。这意味着任何一个请求操做都不会立刻被执行,由于在Netty里全部的操做都是异步的。举个例子下面的代码中在消息被发送以前可能会先关闭链接。
    1 Channel ch = ...;
    2 ch.writeAndFlush(message);
    3 ch.close();

    所以你须要在write()方法返回的ChannelFuture完成后调用close()方法,而后当他的写操做已经完成他会通知他的监听者。请注意,close()方法也可能不会立马关闭,他也会返回一个ChannelFuture

  4. 当一个写请求已经完成是如何通知到咱们?这个只须要简单地在返回的ChannelFuture上增长一个ChannelFutureListener。这里咱们构建了一个匿名的ChannelFutureListener类用来在操做完成时关闭Channel。或者,你可使用简单的预约义监听器代码:
    1 f.addListener(ChannelFutureListener.CLOSE);

为了测试咱们的time服务如咱们指望的同样工做,你可使用UNIX的rdate命令

1 $ rdate -o <port> -p <host>

Port是你在main()函数中指定的端口,host使用locahost就能够了。

Time客户端

不像DISCARD和ECHO的服务端,对于TIME协议咱们须要一个客户端由于人们不能把一个32位的二进制数据翻译成一个日期或者日历。在这一部分,咱们将会讨论如何确保服务端是正常工做的,而且学习怎样用Netty编写一个客户端。

在Netty中,编写服务端和客户端最大的而且惟一不一样的使用了不一样的BootStrapChannel的实现。请看一下下面的代码:

01 package io.netty.example.time;
02  
03 public class TimeClient {
04     public static void main(String[] args) throws Exception {
05         String host = args[0];
06         int port = Integer.parseInt(args[1]);
07         EventLoopGroup workerGroup = new NioEventLoopGroup();
08  
09         try {
10             Bootstrap b = new Bootstrap(); // (1)
11             b.group(workerGroup); // (2)
12             b.channel(NioSocketChannel.class); // (3)
13             b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
14             b.handler(new ChannelInitializer&lt;SocketChannel&gt;() {
15                 @Override
16                 public void initChannel(SocketChannel ch) throwsException {
17                     ch.pipeline().addLast(new TimeClientHandler());
18                 }
19             });
20  
21             // Start the client.
22             ChannelFuture f = b.connect(host, port).sync(); // (5)
23  
24             // Wait until the connection is closed.
25             f.channel().closeFuture().sync();
26         finally {
27             workerGroup.shutdownGracefully();
28         }
29     }
30 }
  1. BootStrapServerBootstrap相似,不过他是对非服务端的channel而言,好比客户端或者无链接传输模式的channel。
  2. 若是你只指定了一个EventLoopGroup,那他就会即做为一个‘boss’线程,也会做为一个‘workder’线程,尽管客户端不须要使用到‘boss’线程。
  3. 代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel被建立时使用。
  4. 不像在使用ServerBootstrap时须要用childOption()方法,由于客户端的SocketChannel没有父channel的概念。
  5. 咱们用connect()方法代替了bind()方法。

正如你看到的,他和服务端的代码是不同的。ChannelHandler是如何实现的?他应该从服务端接受一个32位的整数消息,把他翻译成人们能读懂的格式,并打印翻译好的时间,最后关闭链接:

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class TimeClientHandler extends ChannelHandlerAdapter {
06     @Override
07     public void channelRead(ChannelHandlerContext ctx, Object msg) {
08         ByteBuf m = (ByteBuf) msg; // (1)
09         try {
10             long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
11             System.out.println(new Date(currentTimeMillis));
12