Netty的体系结构及使用

2021年09月07日 阅读数:564
这篇文章主要向大家介绍Netty的体系结构及使用,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。
《Netty权威指南》
 
 
0
 
 
 
 
 
 
1、异步和事件驱动
 
 
1.Java网络编程
 
  1. 阻塞I/O -- socket
  2. 非阻塞I/O -- NIO
 
 
2.Netty简介
代码清单 1-3 展现了一个 Netty所作的是事情和很好的例子。 这里,connect()方法将会直接返回,而不会阻塞,该调用将会在后台完成。这究竟何时会发生 则取决于若干的因素,但这个关注点已经从代码中抽象出来了。由于线程不用阻塞以等待对应的 操做完成,因此它能够同时作其余的工做,从而更加有效地利用资源。
代码清单 1-3 异步地创建链接
Channel channel = ...; // Does not block ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
 
1. 导读
在这一章中,咱们介绍了 Netty 框架的背景知识,包括 Java 网络编程 API 的演变过程,阻塞 和非阻塞网络操做之间的区别,以及异步 I/O 在高容量、高性能的网络编程中的优点。
而后,咱们概述了 Netty 的特性、设计和优势,其中包括 Netty 异步模型的底层机制,包括 回调、Future 以及它们的结合使用。咱们还谈到了事件是如何产生的以及如何拦截和处理它们。
 
 
3. Netty 的核心组件
    1. Channel;
  • Channel 是 Java NIO 的一个基本构造。
  • 它表明一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个可以执行一个或者多个不一样的I/O操做的程序组件)的开放链接,如读操做和写操做 1。
  • 目前,能够把 Channel 看做是传入(入站)或者传出(出站)数据的载体。所以,它能够被打开或者被关闭,链接或者断开链接。
 
 
    1. 回调;
    2. Future;
  • Future 提供了另外一种在操做完成时通知应用程序的方式。这个对象能够看做是一个异步操 做的结果的占位符;它将在将来的某个时刻完成,并提供对其结果的访问。
 
  • JDK 预置了 interface java.util.concurrent.Future,可是其所提供的实现,只 容许手动检查对应的操做是否已经完成,或者一直阻塞直到它完成。这是很是繁琐的,因此 Netty 提供了它本身的实现——ChannelFuture,用于在执行异步操做的时候使用。
 
  • ChannelFuture提供了几种额外的方法,这些方法使得咱们可以注册一个或者多个 ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的 操做完成时被调用 1。而后监听器能够判断该操做是成功地完成了仍是出错了。若是是后者,我 们能够检索产生的Throwable。简而 言之 ,由ChannelFutureListener提供的通知机制消除 了手动检查对应的操做是否完成的必要。
 
  • 每一个 Netty 的出站 I/O 操做都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。 正如咱们前面所提到过的同样,Netty 彻底是异步和事件驱动的。
 
  • 若是你把 ChannelFutureListener 看做是回调的一个更加精细的版本,那么你是对的。 事实上,回调和 Future 是相互补充的机制;它们相互结合,构成了 Netty 自己的关键构件块之一。
 
 
    1. 事件和 ChannelHandler。
  • Netty 使用不一样的事件来通知咱们状态的改变或者是操做的状态。这使得咱们可以基于已经 发生的事件来触发适当的动做。这些动做多是:
    • 记录日志;
    • 数据转换;
    • 流控制;
    • 应用程序逻辑。
  • Netty 是一个网络编程框架,因此事件是按照它们与入站或出站数据流的相关性进行分类的。
可能由入站数据或者相关的状态更改而触发的事件包括:
    • 链接已被激活或者链接失活;
    • 数据读取;
    • 用户事件;
    • 错误事件。
  • 出站事件是将来将会触发的某个动做的操做结果,这些动做包括:
    • 打开或者关闭到远程节点的链接;
    • 将数据写到或者冲刷到套接字。
 
  • 每一个事件均可以被分发给 ChannelHandler 类中的某个用户实现的方法。这是一个很好的 将事件驱动范式直接转换为应用程序构件块的例子。图 1-3 展现了一个事件是如何被一个这样的 ChannelHandler 链处理的。
 
0
 
  • Netty 的 ChannelHandler 为处理器提供了基本的抽象,如图 1-3 所示的那些。咱们会 在适当的时候对 ChannelHandler 进行更多的说明,可是目前你能够认为每一个 Channel- Handler 的实例都相似于一种为了响应特定事件而被执行的回调。
  • Netty 提供了大量预约义的能够开箱即用的 ChannelHandler 实现,包括用于各类协议 (如 HTTP 和 SSL/TLS)的 ChannelHandler。在内部,ChannelHandler 本身也使用了事件
和 Future,使得它们也成为了你的应用程序将使用的相同抽象的消费者。
 
3. 把他们放在一块儿
1.Future、回调和 ChannelHandler
  • Netty 的异步编程模型是创建在 Future 和回调的概念之上的,而将事件派发到 ChannelHandler 的方法则发生在更深的层次上。结合在一块儿,这些元素就提供了一个处理环境,使你的应用程序逻 辑能够独立于任何网络操做相关的顾虑而独立地演变。这也是 Netty 的设计方式的一个关键目标。
  • 拦截操做以及高速地转换入站数据和出站数据,都只须要你提供回调或者利用操做所返回的 Future。这使得连接操做变得既简单又高效,而且促进了可重用的通用代码的编写。
 
2.选择器、事件和 EventLoop
  • Netty 经过触发事件将 Selector 从应用程序中抽象出来,消除了全部原本将须要手动编写 的派发代码。在内部,将会为每一个 Channel 分配一个 EventLoop,用以处理全部事件,包括:
    1. 注册感兴趣的事件;
    2. 将事件派发给 ChannelHandler;
    3. 安排进一步的动做。
  • EventLoop 自己只由一个线程驱动,其处理了一个 Channel 的全部 I/O 事件,而且在该
EventLoop 的整个生命周期内都不会改变。
  • 这个简单而强大的设计消除了你可能有的在ChannelHandler 实现中须要进行同步的任何顾虑,
所以,你能够专一于提供正确的逻辑,用来在有感兴趣的数据要处理的时候执行。如同咱们在详
细探讨 Netty 的线程模型时将会看到的,该 API 是简单而紧凑的。
 

 
2、第一款Netty应用程序
 
 
1.编写 Echo 服务器
 
  • 全部的 Netty 服务器都须要如下两部分。
    • 至少一个ChannelHandler — 服务器对从客户端接收的数据的处理,即它的业务逻辑。
    • 引导 — 这是配置服务器的启动代码。eg: 将服务器绑定到它要监听链接请求的端口上。
 
  • 在第 1 章中,咱们介绍了 Future 和回调,而且阐述了它们在事件驱动设计中的应用。咱们 还讨论了 ChannelHandler,它是一个接口族的父接口,它的实现负责接收并响应事件通知。 在 Netty 应用程序中,全部的数据处理逻辑都包含在这些核心抽象的实现中。
  • 由于你的 Echo 服务器会响应传入的消息,因此它须要实现 ChannelInboundHandler 接口,用 来定义响应入站事件的方法。这个简单的应用程序只须要用到少许的这些方法,因此继承 Channel- InboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。
 
咱们感兴趣的方法是:
channelRead()— 对于每一个传入的消息都要调用;
channelReadComplete()— 通知ChannelInboundHandler最后一次对channel-
Read()的调用是当前批量读取中的最后一条消息;
exceptionCaught()— 在读取操做期间,有异常抛出时会调用。
该 Echo 服务器的 ChannelHandler 实现是 EchoServerHandler,如代码清单 2-1 所示。
/** * 代码清单 2-1 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); // 将接收到的消息 写给发送者,而 不冲刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // 将未决消息冲刷到 远程节点,而且关 闭该 Channel,释放消息 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常 栈跟踪并关闭该Channel cause.printStackTrace(); ctx.close(); } }
 
除了 ChannelInboundHandlerAdapter 以外,还有不少须要学习的 ChannelHandler 的 子类型和实现,咱们将在第 6 章和第 7 章中对它们进行详细的阐述。目前,请记住下面这些关键点:
针对不一样类型的事件来调用 ChannelHandler; 应用程序经过实现或者扩展 ChannelHandler 来挂钩到事件的生命周期,而且提供自
定义的应用程序逻辑; 在架构上,ChannelHandler 有助于保持业务逻辑与网络处理代码的分离。这简化了开
发过程,由于代码必须不断地演化以响应不断变化的需求。
 
 
2. 引导服务器
 
在讨论过由 EchoServerHandler 实现的核心业务逻辑以后,咱们如今能够探讨引导服务 器自己的过程了,具体涉及如下内容:
  • 绑定到服务器端口, 监听并接受传入链接请求;
  • 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。
 
/** * 代码清单2-2 EchoServer类 */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); // 使用的是 NIO 传输,因此指定 了NioEventLoopGroup来接受和处理新的链接 EventLoopGroup group = new NioEventLoopGroup(); try { // 建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 指定所使用的 NIO 传输 Channel .channel(NioServerSocketChannel.class) //使用指定的 端口设置套 接字地址 .localAddress(new InetSocketAddress(port)) //使用了一个特殊的类——ChannelInitializer。这是关键。当一个新的链接被接收时, // 一个新的子 Channel 将会被建立,而 ChannelInitializer 将会把一个你的 // EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); //异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成 ChannelFuture f = b.bind().sync(); //获取 Channel 的 CloseFuture,并 且阻塞当前线 程直到它完成 f.channel().closeFuture().sync(); } finally { // 关闭 EventLoopGroup, 释放全部的资源 group.shutdownGracefully().sync(); } } }
在这个时候,服务器已经初始化,而且已经就绪能被使用了。这个示例使用了 NIO,由于得益于它的可扩展性和完全的异步性,它是目前使用最普遍的传 输。可是也能够使用一个不一样的传输实现。若是你想要在本身的服务器中使用 OIO 传输,将需 要指定 OioServerSocketChannel 和 OioEventLoopGroup。
 
让咱们回顾一下你刚完成的服务器实现中的重要步骤。下面这些是服务器的主要代码组件:
  • EchoServerHandler 实现了业务逻辑;
  • main()方法引导了服务器;
 
引导过程当中所须要的步骤以下:
  • 建立一个 ServerBootstrap 的实例以引导和绑定服务器;
  • 建立并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新链接以及读/写数据;
  • 指定服务器绑定的本地的 InetSocketAddress;
  • 使用一个 EchoServerHandler 的实例初始化每个新的 Channel;
  • 调用 ServerBootstrap.bind()方法以绑定服务器。
 
 
 
3.编写 Echo 客户端
Echo 客户端将会:
(1)链接到服务器;
(2)发送一个或者多个消息; (3)对于每一个消息,等待并接收从服务器发回的相同的消息; (4)关闭链接。
编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和你在服务器中看到的同样。
 
 
如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。在这 个场景 下,你将扩展 SimpleChannelInboundHandler 类以处理全部必须的任务,如代码清单 2-3 所示。这要求重写下面的方法:
channelActive()——在到服务器的链接已经创建以后将被调用; channelRead0()1——当从服务器接收到一条消息时被调用; exceptionCaught()——在处理过程当中引起异常时被调用。
代码清单 2-3 客户端的 ChannelHandler
 
 
 
4 引导客户端
如同将在代码清单 2-4 中所看到的,引导客户端相似于引导服务器,不一样的是,客户端是使 用主机和端口参数来链接远程地址
/** * 代码清单 2-4 客户端的主类 */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) // 设置服务器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }
 
 
让咱们回顾一下这一节中所介绍的要点:
为初始化客户端,建立了一个 Bootstrap 实例;
为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括建立新的 链接以及处理入站和出站数据;
为服务器链接建立了一个 InetSocketAddress 实例;
当链接被创建时,一个 EchoClientHandler 实例会被安装到(该 Channel 的)
ChannelPipeline 中;
在一切都设置完成后,调用 Bootstrap.connect()方法链接到远程节点; 完成了客户端,你即可以着手构建并测试该系统了。
 
 
 

 
3、Netty3组件和设计 本章主要内容
 Netty 的技术和体系结构方面的内容
 Channel、EventLoop 和 ChannelFuture
 ChannelHandler 和 ChannelPipeline
 引导
 
  • 咱们将从两个不一样的但却又密切相 关的视角来探讨 Netty: 类库的视角以及框架的视角。对于使用 Netty 编写高效的、可重用的和 可维护的代码来讲,二者缺一不可。
  • 从高层次的角度来看,Netty 解决了两个相应的关注领域,咱们可将其大体标记为技术的和 体系结构的。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序 性能的最大化和可伸缩性。其次,Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦, 简化了开发过程,同时也最大限度地提升了可测试性、模块化以及代码的可重用性。
  • 在咱们更加详细地研究 Netty 的各个组件时,咱们将密切关注它们是如何协做来支撑这 些体系结构上的最佳实践的。经过遵循一样的原则,咱们即可得到 Netty 所提供的全部益处。
 
 
 
3.1 Channel、EventLoop 和 ChannelFuture
这些类合在一块儿,能够被认为是 Netty 网络抽象的表明:
Channel— Socket;
EventLoop— 控制流、多线程处理、并发;
ChannelFuture— 异步通知。
 
  1. Channel
  • 基本的 I/O 操做(bind()、connect()、read()和 write())依赖于底层网络传输所提 供的原语。
  • 在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地下降了直接使用 Socket 类的复杂性。
  • 此外,Channel 也是拥有许多 预约义的、专门化实现的普遍类层次结构的根
 
  1. EventLoop
  • EventLoop 定义了 Netty 的核心抽象,用于处理链接的生命周期中所发生的事件。
目前,图 3-1 在高层次上说明了 Channel、EventLoop、Thread 以及 EventLoopGroup
之间的关系。
0
一个 EventLoopGroup 包含一个或者多个 EventLoop;
一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
全部由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
一个 Channel 在它的生命周期内只注册于一个 EventLoop;
一个 EventLoop 可能会被分配给一个或多个 Channel。
注意,在这种设计中,一个给定Channel 的 I/O 操做都是由相同的 Thread 执行的,实际
上消除了对于同步的须要。
 
  1. ChannelFuture
正如咱们已经解释过的那样,Netty 中全部的 I/O 操做都是异步的。由于一个操做可能不会
当即返回,因此咱们须要一种用于在以后的某个时间点肯定其结果的方法。为此,Netty 提供了
ChannelFuture接口,其addListener()方法注册了一个ChannelFutureListener,以
便在某个操做完成时(不管是否成功)获得通知。
 
关于 ChannelFuture 的更多讨论 能够将 ChannelFuture 看做是未来要执行的操做的结果的 占位符。它究竟何时被执行则可能取决于若干的因素,所以不可能准确地预测,可是能够肯
定的是它将会被执行。此外,全部属于同一个 Channel 的操做都被保证其将以它们被调用的顺序
被执行。
 
 
3.2 ChannelHandler 和 ChannelPipeline
 
  1. ChannelHandler 接口
  • 从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了全部处理入站和出站数据的应用程序逻辑的容器。
  • 由于 ChannelHandler 的方法是 由网络事件(其中术语“事件”的使用很是普遍)触发的。
  • 事实上,ChannelHandler 可专 门用于几乎任何类型的动做,例如将数据从一种格式转换为另一种格式,或者处理转换过程 中所抛出的异常。
  • 举例来讲,ChannelInboundHandler 是一个你将会常常实现的子接口。这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处 理。当你要给链接的客户端发送响应时,也能够从 ChannelInboundHandler 冲刷数据。你 的应用程序的业务逻辑一般驻留在一个或者多个 ChannelInboundHandler 中。
 
  1. ChannelPipeline 接口
  • 使得事件流经 ChannelPipeline 是 ChannelHandler 的工做,它们是在应用程序的初 始化或者引导阶段被安装的。
  • 这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给 链中的下一个 ChannelHandler。
  • 它们的执行顺序是由它们被添加的顺序所决定的。实际上, 被咱们称为 ChannelPipeline 的是这些 ChannelHandler 的编排顺序。
  • 图 3-3 说明了一个 Netty 应用程序中入站和出站数据流之间的区别。从一个客户端应用程序 的角度来看,若是事件的运动方向是从客户端到服务器端,那么咱们称这些事件为出站的,反之 则称为入站的。
0
 
 
  • 图 3-3 也显示了入站和出站 ChannelHandler 能够被安装到同一个 ChannelPipeline 中。
  • 若是一个消息或者任何其余的入站事件被读取,那么它会从 ChannelPipeline 的头部 开始流动,并被传递给第一个 ChannelInboundHandler。这个 ChannelHandler 不必定 会实际地修改数据,具体取决于它的具体功能,在这以后,数据将会被传递给链中的下一个 ChannelInboundHandler。最终,数据将会到达 ChannelPipeline 的尾端,届时,全部 处理就都结束了。
  • 数据的出站运动(即正在被写的数据)在概念上也是同样的。
 
关于入站和出站 ChannelHandler 的更多讨论
经过使用做为参数传递到每一个方法的 ChannelHandlerContext,事件能够被传递给当前
ChannelHandler 链中的下一个 ChannelHandler。由于你有时会忽略那些不感兴趣的事件,因此 Netty 提供了抽象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。经过调 用 ChannelHandlerContext 上的对应方法,每一个都提供了简单地将事件传递给下一个 ChannelHandler 的方法的实现。随后,你能够经过重写你所感兴趣的那些方法来扩展这些类。
 
 
在Netty中,有两种发送消息的方式。
  1. 你能够直接写到Channel中,
  2. 也能够 写到和Channel- Handler 相关联的 ChannelHandlerContext 对象中。
前一种方式将会致使消息从 Channel- Pipeline 的尾端开始流动,然后者将致使消息从 ChannelPipeline 中的下一个 Channel- Handler 开始流动。
 
3. 更加深刻地了解 ChannelHandler
正如咱们以前所说的,有许多不一样类型的 ChannelHandler,它们各自的功能主要取决于 它们的超类。Netty 以适配器类的形式提供了大量默认的 ChannelHandler 实现,其旨在简化 应用程序处理逻辑的开发过程。你已经看到了,ChannelPipeline 中的每一个 ChannelHandler 将负责把事件转发到链中的下一个 ChannelHandler。这些适配器类(及它们的子类)将自动 执行这个操做,因此你能够只重写那些你想要特殊处理的方法和事件。
 
为何须要适配器类
有一些适配器类能够将编写自定义的 ChannelHandler 所须要的努力降到最低限度,由于它们提 供了定义在对应接口中的全部方法的默认实现。
下面这些是编写自定义 ChannelHandler 时常常会用到的适配器类:
 ChannelHandlerAdapter
 ChannelInboundHandlerAdapter
 ChannelOutboundHandlerAdapter
 ChannelDuplexHandler
 
 
4.编码器和解码器
当你经过 Netty 发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解 码;也就是说,从字节转换为另外一种格式,一般是一个 Java 对象。
若是是出站消息,则会发生 相反方向的转换:它将从它的当前格式被编码为字节。这两种方向的转换的缘由很简单:网络数 据老是一系列的字节。
对应于特定的须要,Netty为编码器和解码器提供了不一样类型的抽象类。例如,你的应用程序可能使用了一种中间格式,而不须要当即将消息转换成字节。你将仍然须要一个编码器,可是 它将派生自一个不一样的超类。为了肯定合适的编码器类型,你能够应用一个简单的命名约定。
一般来讲,这些基类的名称将相似于 ByteToMessageDecoder 或 MessageToByte- Encoder。对于特殊的类型,你可能会发现相似于 ProtobufEncoder 和 ProtobufDecoder 这样的名称——预置的用来支持 Google 的 Protocol Buffers。
 
5.抽象类 SimpleChannelInboundHandler
在这种类型的 ChannelHandler 中,最重要的方法是 channelRead0(Channel- HandlerContext,T)。除了要求不要阻塞当前的 I/O 线程以外,其具体实现彻底取决于你。
 
 
3.3 引导
 
  • Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的 端口,或者将一个进程链接到另外一个运行在某个指定主机的指定端口上的进程。
  • 一般来讲,咱们把前面的用例称做引导一个服务器,后面的用例称做引导一个客户端。虽然 这个术语简单方便,可是它略微掩盖了一个重要的事实,即“服务器”和“客户端”实际上表示 了不一样的网络行为; 换句话说,是监听传入的链接仍是创建到一个或者多个进程的链接。
  • 所以,有两种类型的引导: 一种用于客户端(简单地称为 Bootstrap),而另外一种 (ServerBootstrap)用于服务器。
 
表 3-1 比较了这两种 类型的引导类。
0
 
引导一个客户端只须要一个 EventLoopGroup,可是一个 ServerBootstrap 则须要两个(也能够是同一个实例)。为何呢?
由于服务器须要两组不一样的 Channel。
  • 第一组将只包含一个 ServerChannel,表明服务 器自身的已绑定到某个本地端口的正在监听的套接字。
  • 而第二组将包含全部已建立的用来处理传 入客户端链接(对于每一个服务器已经接受的链接都有一个)的 Channel。
  • 图 3-4 说明了这个模 型,而且展现了为什么须要两个不一样的 EventLoopGroup。
0
与 ServerChannel 相关联的 EventLoopGroup 将负责分配一个为链接请求建立 Channel 的 EventLoop。一旦链接被接受,第二个 EventLoopGroup 就会给它的 Channel 分配一个 EventLoop。
 
 
 
 
 

 
4、传输
在本章中,咱们将研究:
1.Netty传输、它们的实现和使用,以及 Netty 是如何将它们呈现给开发者的。
2.深刻探讨了 Netty 预置的传输,而且解释了它们的行为。
3.如何匹配不一样的传输和特定用例的需求。
 
本章主要内容
 OIO——阻塞传输
 NIO——异步传输
 Local——JVM 内部的异步通讯
 Embedded——测试你的ChannelHandler
 
流经网络的数据老是具备相同的类型:字节。这些字节是如何流动的主要取决于咱们所说的 网络传输— 一个帮助咱们抽象底层数据传输机制的概念。用户并不关心这些细节;他们只想确 保他们的字节被可靠地发送和接收。
 
若是你有 Java 网络编程的经验,那么你可能已经发现,在某些时候,你须要支撑比预期多 不少的并发链接。若是你随后尝试从阻塞传输切换到非阻塞传输,那么你可能会由于这两种网络 API 的大相径庭而遇到问题。
 
然而,Netty 为它全部的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所可以达到的简单得多。所产生的代码不会被实现的细节所污染,而你也不须要在你的整个代码 库上进行普遍的重构。简而言之,你能够将时间花在其余更有成效的事情上。
 
4.1 案例研究:传输迁移
 
1.不经过 Netty 使用 OIO 和 NIO
/** * 代码清单 4-1 未使用 Netty 的阻塞网络编程 * * @author xuxh * @date 2021/03/07 11:27 */ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for (; ; ) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch ( IOException e) { e.printStackTrace(); } } }
 
/** * 代码清单 4-2 未使用 Netty 的异步网络编程 * * @author xuxh * @date 2021/03/07 11:32 */ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ssocket.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (; ; ) { try { // 等待须要处理的新事 件;阻塞 将一直持续到 下一个传入事件 selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } // 获取全部接 收事件的 Selection- Key 实例 Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 检查事件是不是一 个新的已经就绪可 以被接受的链接 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受客户端,并将它注册到选择器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 检查套接字是否已经准备好写数据 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 将数据写到已链接的客户端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }
 
如同你所看到的,虽然这段代码所作的事情与以前的版本彻底相同,可是代码却大相径庭。 若是为了用于非阻塞 I/O 而从新实现这个简单的应用程序,都须要一次彻底的重写的话,那么不 难想象,移植真正复杂的应用程序须要付出什么样的努力。
鉴于此,让咱们来看看使用 Netty 实现该应用程序将会是什么样子吧。
 
4.1.2 经过 Netty 使用 OIO 和 NIO
/** * 代码清单 4-3 使用 Netty 的阻塞网络处理 * * @author xuxh * @date 2021/03/07 21:15 */ public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); EventLoopGroup group = new OioEventLoopGroup(); try { // 建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 OioEventLoopGroup 以容许阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每一个已接受的 链接都调用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 链接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受链接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放全部的资源 group.shutdownGracefully().sync(); } } }
 
/** * 代码清单 4-4 使用 Netty 的异步网络处理 * * @author xuxh * @date 2021/03/07 21:40 */ public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); // 使用的是 NIO 传输,因此指定 了NioEventLoopGroup来接受和处理新的链接 EventLoopGroup group = new NioEventLoopGroup(); try { // 建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 NioEventLoopGroup 非阻塞模式 .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每一个已接受的 链接都调用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 链接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受链接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放全部的资源 group.shutdownGracefully().sync(); } } }
由于 Netty 为每种传输的实现都暴露了相同的 API,因此不管选用哪种传输的实现,你的 代码都仍然几乎不受影响。在全部的状况下,传
 
 
4.2 传输 API
 
传输 API 的核心是 interface Channel,它被用于全部的 I/O 操做。Channel 类的层次结构如图 4-1 所示。
0
 
如图所示,每一个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig。
  • ChannelConfig 包含了该 Channel 的全部配置设置,而且支持热更新。因为特定的传输可能具备独特的设置,因此它可能会实现一个 ChannelConfig 的子类型。(请参考 ChannelConfig 实现对应的 Javadoc。)
  • ChannelPipeline 持有全部应用于入站数据和出站数据以及事件的 ChannelHandler 实 例,这些 ChannelHandler 实现了应用程序处理状态变化以及数据处理的逻辑。
  • 因为 Channel 是独一无二的,因此为了保证顺序将 Channel 声明为 java.lang. Comparable 的一个子接口。所以,若是两个不一样的 Channel 实例都返回了相同的散列码,那 么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。
 
ChannelHandler 的典型用途包括:
将数据从一种格式转换为另外一种格式;
提供异常的通知;
提供 Channel 变为活动的或者非活动的通知;
提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知;
提供有关用户自定义事件的通知。
 
你也能够根据须要经过添加或者移除ChannelHandler实例来修改ChannelPipeline。经过利用Netty的这项能力能够构建出高度灵活的应用程序。
 
除了访问所分配的 ChannelPipeline 和 ChannelConfig 以外,也能够利用 Channel 的其余方法,其中最重要的列举在表 4-1 中。
0
稍后咱们将进一步深刻地讨论全部这些特性的应用。目前,请记住,Netty 所提供的普遍功 能只依赖于少许的接口。这意味着,你能够对你的应用程序逻辑进行重大的修改,而又无需大规 模地重构你的代码库。
 
Netty 的 Channel 实现是线程安全的,所以你能够存储一个到 Channel 的引用,而且每当 你须要向远程节点写数据时,均可以使用它,即便当时许多线程都在使用它。
 
 
4.3 内置的传输
 
Netty 内置了一些可开箱即用的传输。由于并非它们全部的传输都支持每一种协议,因此 你必须选择一个和你的应用程序所使用的协议相容的传输。
在本节中咱们将讨论这些关系。表 4-2 显示了全部 Netty 提供的传输。
0
 
0
 
4.3.1 NIO——非阻塞 I/O
 
NIO 提供了一个全部 I/O 操做的全异步的实现。它利用了自 NIO 子系统被引入 JDK 1.4 时便 可用的基于选择器的 API。
选择器背后的基本概念是充当一个注册表,,当 Channel 的状态发生变化时, 在选择器能够获得通知。
Channel可能的状态变化有:
新的 Channel 已被接受而且就绪;
Channel 链接已经完成;
Channel 有已经就绪的可供读取的数据;
Channel 可用于写数据。
选择器运行在一个检查状态变化并对其作出相应响应的线程上,在应用程序对状态的变化作出响应以后,选择器将会被重置,并将重复这个过程。
 
表 4-3 中的常量值表明了由 class java.nio.channels.SelectionKey 定义的位模式。这些位模式能够组合起来定义一组应用程序正在请求通知的状态变化集。
0
 
对于全部传输都共有的用户级别 API ,Netty彻底地隐藏了这些 NIO 的内部细节。 图 4-2 展现了该处理流程。
0
 
 
零拷贝
零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可以使用的特性。它使你能够快速 高效地将数据从文件系统移动到网络接口,而不须要将其从内核空间复制到用户空间,在像 FTP 或者 HTTP 这样的协议中能够显著地提高性能。可是,并非全部的操做系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。
 
 
 
4.3.2 Epoll — 用于 Linux 的本地非阻塞传输4.3.2 Epoll— 用于 Linux 的本地非阻塞传输
正如咱们以前所说的,Netty 的 NIO 传输基于 Java 提供的异步/非阻塞网络编程的通用抽象。 虽然这保证了 Netty 的非阻塞 API 能够在任何平台上使用,但它也包含了相应的限制,由于 JDK 为了在全部系统上提供相同的功能,必须作出妥协。
 
Linux做为高性能网络编程的平台,其重要性与日俱增,这催生了大量先进特性的开发,其中包括epoll——一个高度可扩展的I/O事件通知特性。这个API自Linux内核版本 2.5.44(2002)被 引入,提供了比旧的POSIX select和poll系统调用 1更好的性能,同时如今也是Linux上非阻 塞网络编程的事实标准。Linux JDK NIO API使用了这些epoll调用。
 
Netty为Linux提供了一组NIO API,它以一种和它自己的设计更加一致的方式使用epoll,而且以一种更加轻量的方式使用中断。若是你的应用程序只运行于Linux系统,那么请考虑利用 这个版本的传输; 你将发如今高负载下它的性能要优于JDK的NIO实现。
 
这个传输的语义与在图 4-2 所示的彻底相同,并且它的用法也是简单直接的。相关示例参照 代码清单 4-4。若是要在那个代码清单中使用 epoll 替代 NIO,只须要将 NioEventLoopGroup 替换为 EpollEventLoopGroup,而且将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class 便可。
 
 
4.3.3 OIO — 旧的阻塞 I/O
Netty 的 OIO 传输实现表明了一种折中: 它能够经过常规的传输 API 使用,可是因为它是创建在 java.net 包的阻塞实现之上的,因此它不是异步的。可是,它仍然很是适合于某些用途。
 
有了这个背景,你可能会想,Netty是如何可以使用和用于异步传输相同的API来支持OIO的呢。 答案就是,Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操做完成的最大毫秒数。若是操做在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty 将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是相似于Netty这样的异步框架可以支持OIO的惟一方式。
这种方式的一个问题是,当一个SocketTimeoutException被抛出时填充栈跟踪所须要的时间,其对于性能来讲代价很大。
图 4-3 说明了这个逻辑。