Netty的体系结构及使用

《Netty权威指南》

一、异步和事件驱动

1.Java网络编程

  1. 阻塞I/O -- socket
  2. 非阻塞I/O -- NIO

2.Netty简介

代码清单 1-3 展示了一个 Netty所做的是事情和很好的例子。 这里,connect()方法将会直接返回,而不会阻塞,该调用将会在后台完成。这究竟什么时候会发生 则取决于若干的因素,但这个关注点已经从代码中抽象出来了。因为线程不用阻塞以等待对应的 操作完成,所以它可以同时做其他的工作,从而更加有效地利用资源。

代码清单 1-3 异步地建立连接

Channel channel = ...; // Does not block ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));

1. 导读

在这一章中,我们介绍了 Netty 框架的背景知识,包括 Java 网络编程 API 的演变过程,阻塞 和非阻塞网络操作之间的区别,以及异步 I/O 在高容量、高性能的网络编程中的优势。

然后,我们概述了 Netty 的特性、设计和优点,其中包括 Netty 异步模型的底层机制,包括 回调、Future 以及它们的结合使用。我们还谈到了事件是如何产生的以及如何拦截和处理它们。

3. Netty 的核心组件

    1. Channel;
  • Channel 是 Java NIO 的一个基本构造。
  • 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作 1。
  • 目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。
    1. 回调;
    2. Future;
  • Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操 作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
  • JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,只 允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty 提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用。
  • ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或者多个 ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的 操作完成时被调用 1。然后监听器可以判断该操作是成功地完成了还是出错了。如果是后者,我 们可以检索产生的Throwable。简而 言之 ,由ChannelFutureListener提供的通知机制消除 了手动检查对应的操作是否完成的必要。
  • 每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。 正如我们前面所提到过的一样,Netty 完全是异步和事件驱动的。
  • 如果你把 ChannelFutureListener 看作是回调的一个更加精细的版本,那么你是对的。 事实上,回调和 Future 是相互补充的机制;它们相互结合,构成了 Netty 本身的关键构件块之一。
    1. 事件和 ChannelHandler。
  • Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经 发生的事件来触发适当的动作。这些动作可能是:
    • 记录日志;
    • 数据转换;
    • 流控制;
    • 应用程序逻辑。
  • Netty 是一个网络编程框架,所以事件是按照它们与入站或出站数据流的相关性进行分类的。

可能由入站数据或者相关的状态更改而触发的事件包括:

    • 连接已被激活或者连接失活;
    • 数据读取;
    • 用户事件;
    • 错误事件。
  • 出站事件是未来将会触发的某个动作的操作结果,这些动作包括:
    • 打开或者关闭到远程节点的连接;
    • 将数据写到或者冲刷到套接字。
  • 每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。这是一个很好的 将事件驱动范式直接转换为应用程序构件块的例子。图 1-3 展示了一个事件是如何被一个这样的 ChannelHandler 链处理的。
  • Netty 的 ChannelHandler 为处理器提供了基本的抽象,如图 1-3 所示的那些。我们会 在适当的时候对 ChannelHandler 进行更多的说明,但是目前你可以认为每个 Channel- Handler 的实例都类似于一种为了响应特定事件而被执行的回调。
  • Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议 (如 HTTP 和 SSL/TLS)的 ChannelHandler。在内部,ChannelHandler 自己也使用了事件

和 Future,使得它们也成为了你的应用程序将使用的相同抽象的消费者。

3. 把他们放在一起

1.Future、回调和 ChannelHandler

  • Netty 的异步编程模型是建立在 Future 和回调的概念之上的,而将事件派发到 ChannelHandler 的方法则发生在更深的层次上。结合在一起,这些元素就提供了一个处理环境,使你的应用程序逻 辑可以独立于任何网络操作相关的顾虑而独立地演变。这也是 Netty 的设计方式的一个关键目标。
  • 拦截操作以及高速地转换入站数据和出站数据,都只需要你提供回调或者利用操作所返回的 Future。这使得链接操作变得既简单又高效,并且促进了可重用的通用代码的编写。

2.选择器、事件和 EventLoop

  • Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写 的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:
    1. 注册感兴趣的事件;
    2. 将事件派发给 ChannelHandler;
    3. 安排进一步的动作。
  • EventLoop 本身只由一个线程驱动,其处理了一个 Channel 的所有 I/O 事件,并且在该

EventLoop 的整个生命周期内都不会改变。

  • 这个简单而强大的设计消除了你可能有的在ChannelHandler 实现中需要进行同步的任何顾虑,

因此,你可以专注于提供正确的逻辑,用来在有感兴趣的数据要处理的时候执行。如同我们在详

细探讨 Netty 的线程模型时将会看到的,该 API 是简单而紧凑的。


二、第一款Netty应用程序

1.编写 Echo 服务器

  • 所有的 Netty 服务器都需要以下两部分。
    • 至少一个ChannelHandler — 服务器对从客户端接收的数据的处理,即它的业务逻辑。
    • 引导 — 这是配置服务器的启动代码。eg: 将服务器绑定到它要监听连接请求的端口上。
  • 在第 1 章中,我们介绍了 Future 和回调,并且阐述了它们在事件驱动设计中的应用。我们 还讨论了 ChannelHandler,它是一个接口族的父接口,它的实现负责接收并响应事件通知。 在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。
  • 因为你的 Echo 服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口,用 来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 Channel- InboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。

我们感兴趣的方法是:

channelRead()— 对于每个传入的消息都要调用;

channelReadComplete()— 通知ChannelInboundHandler最后一次对channel-

Read()的调用是当前批量读取中的最后一条消息;

exceptionCaught()— 在读取操作期间,有异常抛出时会调用。

该 Echo 服务器的 ChannelHandler 实现是 EchoServerHandler,如代码清单 2-1 所示。

/** * 代码清单 2-1 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); // 将接收到的消息 写给发送者,而 不冲刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // 将未决消息冲刷到 远程节点,并且关 闭该 Channel,释放消息 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常 栈跟踪并关闭该Channel cause.printStackTrace(); ctx.close(); } }

除了 ChannelInboundHandlerAdapter 之外,还有很多需要学习的 ChannelHandler 的 子类型和实现,我们将在第 6 章和第 7 章中对它们进行详细的阐述。目前,请记住下面这些关键点:

针对不同类型的事件来调用 ChannelHandler; 应用程序通过实现或者扩展 ChannelHandler 来挂钩到事件的生命周期,并且提供自

定义的应用程序逻辑; 在架构上,ChannelHandler 有助于保持业务逻辑与网络处理代码的分离。这简化了开

发过程,因为代码必须不断地演化以响应不断变化的需求。

2. 引导服务器

在讨论过由 EchoServerHandler 实现的核心业务逻辑之后,我们现在可以探讨引导服务 器本身的过程了,具体涉及以下内容:

  • 绑定到服务器端口, 监听并接受传入连接请求;
  • 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。

/** * 代码清单2-2 EchoServer类 */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); // 使用的是 NIO 传输,所以指定 了NioEventLoopGroup来接受和处理新的连接 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 指定所使用的 NIO 传输 Channel .channel(NioServerSocketChannel.class) //使用指定的 端口设置套 接字地址 .localAddress(new InetSocketAddress(port)) //使用了一个特殊的类——ChannelInitializer。这是关键。当一个新的连接被接收时, // 一个新的子 Channel 将会被创建,而 ChannelInitializer 将会把一个你的 // EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); //异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成 ChannelFuture f = b.bind().sync(); //获取 Channel 的 CloseFuture,并 且阻塞当前线 程直到它完成 f.channel().closeFuture().sync(); } finally { // 关闭 EventLoopGroup, 释放所有的资源 group.shutdownGracefully().sync(); } } }

在这个时候,服务器已经初始化,并且已经就绪能被使用了。这个示例使用了 NIO,因为得益于它的可扩展性和彻底的异步性,它是目前使用最广泛的传 输。但是也可以使用一个不同的传输实现。如果你想要在自己的服务器中使用 OIO 传输,将需 要指定 OioServerSocketChannel 和 OioEventLoopGroup。

让我们回顾一下你刚完成的服务器实现中的重要步骤。下面这些是服务器的主要代码组件:

  • EchoServerHandler 实现了业务逻辑;
  • main()方法引导了服务器;

引导过程中所需要的步骤如下:

  • 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
  • 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;
  • 指定服务器绑定的本地的 InetSocketAddress;
  • 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
  • 调用 ServerBootstrap.bind()方法以绑定服务器。

3.编写 Echo 客户端

Echo 客户端将会:

(1)连接到服务器;

(2)发送一个或者多个消息; (3)对于每个消息,等待并接收从服务器发回的相同的消息; (4)关闭连接。

编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和你在服务器中看到的一样。

如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。在这 个场景 下,你将扩展 SimpleChannelInboundHandler 类以处理所有必须的任务,如代码清单 2-3 所示。这要求重写下面的方法:

channelActive()——在到服务器的连接已经建立之后将被调用; channelRead0()1——当从服务器接收到一条消息时被调用; exceptionCaught()——在处理过程中引发异常时被调用。

代码清单 2-3 客户端的 ChannelHandler

4 引导客户端

如同将在代码清单 2-4 中所看到的,引导客户端类似于引导服务器,不同的是,客户端是使 用主机和端口参数来连接远程地址

/** * 代码清单 2-4 客户端的主类 */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) // 设置服务器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }

让我们回顾一下这一节中所介绍的要点:

为初始化客户端,创建了一个 Bootstrap 实例;

为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的 连接以及处理入站和出站数据;

为服务器连接创建了一个 InetSocketAddress 实例;

当连接被建立时,一个 EchoClientHandler 实例会被安装到(该 Channel 的)

ChannelPipeline 中;

在一切都设置完成后,调用 Bootstrap.connect()方法连接到远程节点; 完成了客户端,你便可以着手构建并测试该系统了。


三、Netty3组件和设计 本章主要内容

 Netty 的技术和体系结构方面的内容

 Channel、EventLoop 和 ChannelFuture

 ChannelHandler 和 ChannelPipeline

 引导

  • 我们将从两个不同的但却又密切相 关的视角来探讨 Netty: 类库的视角以及框架的视角。对于使用 Netty 编写高效的、可重用的和 可维护的代码来说,两者缺一不可。
  • 从高层次的角度来看,Netty 解决了两个相应的关注领域,我们可将其大致标记为技术的和 体系结构的。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序 性能的最大化和可伸缩性。其次,Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦, 简化了开发过程,同时也最大限度地提高了可测试性、模块化以及代码的可重用性。
  • 在我们更加详细地研究 Netty 的各个组件时,我们将密切关注它们是如何协作来支撑这 些体系结构上的最佳实践的。通过遵循同样的原则,我们便可获得 Netty 所提供的所有益处。

3.1 Channel、EventLoop 和 ChannelFuture

这些类合在一起,可以被认为是 Netty 网络抽象的代表:

Channel— Socket;

EventLoop— 控制流、多线程处理、并发;

ChannelFuture— 异步通知。

  1. Channel
  • 基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提 供的原语。
  • 在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
  • 此外,Channel 也是拥有许多 预定义的、专门化实现的广泛类层次结构的根
  1. EventLoop
  • EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。

目前,图 3-1 在高层次上说明了 Channel、EventLoop、Thread 以及 EventLoopGroup

之间的关系。

一个 EventLoopGroup 包含一个或者多个 EventLoop;

一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;

所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;

一个 Channel 在它的生命周期内只注册于一个 EventLoop;

一个 EventLoop 可能会被分配给一个或多个 Channel。

注意,在这种设计中,一个给定Channel 的 I/O 操作都是由相同的 Thread 执行的,实际

上消除了对于同步的需要。

  1. ChannelFuture

正如我们已经解释过的那样,Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会

立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了

ChannelFuture接口,其addListener()方法注册了一个ChannelFutureListener,以

便在某个操作完成时(无论是否成功)得到通知。

关于 ChannelFuture 的更多讨论 可以将 ChannelFuture 看作是将来要执行的操作的结果的 占位符。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯

定的是它将会被执行。此外,所有属于同一个 Channel 的操作都被保证其将以它们被调用的顺序

被执行。

3.2 ChannelHandler 和 ChannelPipeline

  1. ChannelHandler 接口
  • 从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
  • 因为 ChannelHandler 的方法是 由网络事件(其中术语“事件”的使用非常广泛)触发的。
  • 事实上,ChannelHandler 可专 门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,或者处理转换过程 中所抛出的异常。
  • 举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处 理。当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。你 的应用程序的业务逻辑通常驻留在一个或者多个 ChannelInboundHandler 中。
  1. ChannelPipeline 接口
  • 使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初 始化或者引导阶段被安装的。
  • 这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给 链中的下一个 ChannelHandler。
  • 它们的执行顺序是由它们被添加的顺序所决定的。实际上, 被我们称为 ChannelPipeline 的是这些 ChannelHandler 的编排顺序。
  • 图 3-3 说明了一个 Netty 应用程序中入站和出站数据流之间的区别。从一个客户端应用程序 的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之 则称为入站的。
  • 图 3-3 也显示了入站和出站 ChannelHandler 可以被安装到同一个 ChannelPipeline 中。
  • 如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部 开始流动,并被传递给第一个 ChannelInboundHandler。这个 ChannelHandler 不一定 会实际地修改数据,具体取决于它的具体功能,在这之后,数据将会被传递给链中的下一个 ChannelInboundHandler。最终,数据将会到达 ChannelPipeline 的尾端,届时,所有 处理就都结束了。
  • 数据的出站运动(即正在被写的数据)在概念上也是一样的。

关于入站和出站 ChannelHandler 的更多讨论

通过使用作为参数传递到每个方法的 ChannelHandlerContext,事件可以被传递给当前

ChannelHandler 链中的下一个 ChannelHandler。因为你有时会忽略那些不感兴趣的事件,所以 Netty 提供了抽象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。通过调 用 ChannelHandlerContext 上的对应方法,每个都提供了简单地将事件传递给下一个 ChannelHandler 的方法的实现。随后,你可以通过重写你所感兴趣的那些方法来扩展这些类。

在Netty中,有两种发送消息的方式。

  1. 你可以直接写到Channel中,
  2. 也可以 写到和Channel- Handler 相关联的 ChannelHandlerContext 对象中。

前一种方式将会导致消息从 Channel- Pipeline 的尾端开始流动,而后者将导致消息从 ChannelPipeline 中的下一个 Channel- Handler 开始流动。

3. 更加深入地了解 ChannelHandler

正如我们之前所说的,有许多不同类型的 ChannelHandler,它们各自的功能主要取决于 它们的超类。Netty 以适配器类的形式提供了大量默认的 ChannelHandler 实现,其旨在简化 应用程序处理逻辑的开发过程。你已经看到了,ChannelPipeline 中的每个 ChannelHandler 将负责把事件转发到链中的下一个 ChannelHandler。这些适配器类(及它们的子类)将自动 执行这个操作,所以你可以只重写那些你想要特殊处理的方法和事件。

为什么需要适配器类

有一些适配器类可以将编写自定义的 ChannelHandler 所需要的努力降到最低限度,因为它们提 供了定义在对应接口中的所有方法的默认实现。

下面这些是编写自定义 ChannelHandler 时经常会用到的适配器类:

 ChannelHandlerAdapter

 ChannelInboundHandlerAdapter

 ChannelOutboundHandlerAdapter

 ChannelDuplexHandler

4.编码器和解码器

当你通过 Netty 发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解 码;也就是说,从字节转换为另一种格式,通常是一个 Java 对象。

如果是出站消息,则会发生 相反方向的转换:它将从它的当前格式被编码为字节。这两种方向的转换的原因很简单:网络数 据总是一系列的字节。

对应于特定的需要,Netty为编码器和解码器提供了不同类型的抽象类。例如,你的应用程序可能使用了一种中间格式,而不需要立即将消息转换成字节。你将仍然需要一个编码器,但是 它将派生自一个不同的超类。为了确定合适的编码器类型,你可以应用一个简单的命名约定。

通常来说,这些基类的名称将类似于 ByteToMessageDecoder 或 MessageToByte- Encoder。对于特殊的类型,你可能会发现类似于 ProtobufEncoder 和 ProtobufDecoder 这样的名称——预置的用来支持 Google 的 Protocol Buffers。

5.抽象类 SimpleChannelInboundHandler

在这种类型的 ChannelHandler 中,最重要的方法是 channelRead0(Channel- HandlerContext,T)。除了要求不要阻塞当前的 I/O 线程之外,其具体实现完全取决于你。

3.3 引导

  • Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的 端口,或者将一个进程连接到另一个运行在某个指定主机的指定端口上的进程。
  • 通常来说,我们把前面的用例称作引导一个服务器,后面的用例称作引导一个客户端。虽然 这个术语简单方便,但是它略微掩盖了一个重要的事实,即“服务器”和“客户端”实际上表示 了不同的网络行为; 换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。
  • 因此,有两种类型的引导: 一种用于客户端(简单地称为 Bootstrap),而另一种 (ServerBootstrap)用于服务器。

表 3-1 比较了这两种 类型的引导类。

引导一个客户端只需要一个 EventLoopGroup,但是一个 ServerBootstrap 则需要两个(也可以是同一个实例)。为什么呢?

因为服务器需要两组不同的 Channel。

  • 第一组将只包含一个 ServerChannel,代表服务 器自身的已绑定到某个本地端口的正在监听的套接字。
  • 而第二组将包含所有已创建的用来处理传 入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel。
  • 图 3-4 说明了这个模 型,并且展示了为何需要两个不同的 EventLoopGroup。

与 ServerChannel 相关联的 EventLoopGroup 将负责分配一个为连接请求创建 Channel 的 EventLoop。一旦连接被接受,第二个 EventLoopGroup 就会给它的 Channel 分配一个 EventLoop。


四、传输

在本章中,我们将研究:

1.Netty传输、它们的实现和使用,以及 Netty 是如何将它们呈现给开发者的。

2.深入探讨了 Netty 预置的传输,并且解释了它们的行为。

3.如何匹配不同的传输和特定用例的需求。

本章主要内容

 OIO——阻塞传输

 NIO——异步传输

 Local——JVM 内部的异步通信

 Embedded——测试你的ChannelHandler

流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的 网络传输— 一个帮助我们抽象底层数据传输机制的概念。用户并不关心这些细节;他们只想确 保他们的字节被可靠地发送和接收。

如果你有 Java 网络编程的经验,那么你可能已经发现,在某些时候,你需要支撑比预期多 很多的并发连接。如果你随后尝试从阻塞传输切换到非阻塞传输,那么你可能会因为这两种网络 API 的截然不同而遇到问题。

然而,Netty 为它所有的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所能够达到的简单得多。所产生的代码不会被实现的细节所污染,而你也不需要在你的整个代码 库上进行广泛的重构。简而言之,你可以将时间花在其他更有成效的事情上。

4.1 案例研究:传输迁移

1.不通过 Netty 使用 OIO 和 NIO

/** * 代码清单 4-1 未使用 Netty 的阻塞网络编程 * * @author xuxh * @date 2021/03/07 11:27 */ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for (; ; ) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch ( IOException e) { e.printStackTrace(); } } }

/** * 代码清单 4-2 未使用 Netty 的异步网络编程 * * @author xuxh * @date 2021/03/07 11:32 */ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ssocket.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (; ; ) { try { // 等待需要处理的新事 件;阻塞 将一直持续到 下一个传入事件 selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } // 获取所有接 收事件的 Selection- Key 实例 Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 检查事件是否是一 个新的已经就绪可 以被接受的连接 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受客户端,并将它注册到选择器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 检查套接字是否已经准备好写数据 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 将数据写到已连接的客户端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }

如同你所看到的,虽然这段代码所做的事情与之前的版本完全相同,但是代码却截然不同。 如果为了用于非阻塞 I/O 而重新实现这个简单的应用程序,都需要一次完全的重写的话,那么不 难想象,移植真正复杂的应用程序需要付出什么样的努力。

鉴于此,让我们来看看使用 Netty 实现该应用程序将会是什么样子吧。

4.1.2 通过 Netty 使用 OIO 和 NIO

/** * 代码清单 4-3 使用 Netty 的阻塞网络处理 * * @author xuxh * @date 2021/03/07 21:15 */ public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); EventLoopGroup group = new OioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 OioEventLoopGroup 以允许阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每个已接受的 连接都调用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 连接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放所有的资源 group.shutdownGracefully().sync(); } } }

/** * 代码清单 4-4 使用 Netty 的异步网络处理 * * @author xuxh * @date 2021/03/07 21:40 */ public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); // 使用的是 NIO 传输,所以指定 了NioEventLoopGroup来接受和处理新的连接 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 NioEventLoopGroup 非阻塞模式 .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每个已接受的 连接都调用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 连接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放所有的资源 group.shutdownGracefully().sync(); } } }

因为 Netty 为每种传输的实现都暴露了相同的 API,所以无论选用哪一种传输的实现,你的 代码都仍然几乎不受影响。在所有的情况下,传

4.2 传输 API

传输 API 的核心是 interface Channel,它被用于所有的 I/O 操作。Channel 类的层次结构如图 4-1 所示。

如图所示,每个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig。

  • ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个 ChannelConfig 的子类型。(请参考 ChannelConfig 实现对应的 Javadoc。)
  • ChannelPipeline 持有所有应用于入站数据和出站数据以及事件的 ChannelHandler 实 例,这些 ChannelHandler 实现了应用程序处理状态变化以及数据处理的逻辑。
  • 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang. Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那 么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。

ChannelHandler 的典型用途包括:

将数据从一种格式转换为另一种格式;

提供异常的通知;

提供 Channel 变为活动的或者非活动的通知;

提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知;

提供有关用户自定义事件的通知。

你也可以根据需要通过添加或者移除ChannelHandler实例来修改ChannelPipeline。通过利用Netty的这项能力可以构建出高度灵活的应用程序。

除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法,其中最重要的列举在表 4-1 中。

稍后我们将进一步深入地讨论所有这些特性的应用。目前,请记住,Netty 所提供的广泛功 能只依赖于少量的接口。这意味着,你可以对你的应用程序逻辑进行重大的修改,而又无需大规 模地重构你的代码库。

Netty 的 Channel 实现是线程安全的,因此你可以存储一个到 Channel 的引用,并且每当 你需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它。

4.3 内置的传输

Netty 内置了一些可开箱即用的传输。因为并不是它们所有的传输都支持每一种协议,所以 你必须选择一个和你的应用程序所使用的协议相容的传输。

在本节中我们将讨论这些关系。表 4-2 显示了所有 Netty 提供的传输。

4.3.1 NIO——非阻塞 I/O

NIO 提供了一个所有 I/O 操作的全异步的实现。它利用了自 NIO 子系统被引入 JDK 1.4 时便 可用的基于选择器的 API。

选择器背后的基本概念是充当一个注册表,,当 Channel 的状态发生变化时, 在选择器可以得到通知。

Channel可能的状态变化有:

新的 Channel 已被接受并且就绪;

Channel 连接已经完成;

Channel 有已经就绪的可供读取的数据;

Channel 可用于写数据。

选择器运行在一个检查状态变化并对其做出相应响应的线程上,在应用程序对状态的变化做出响应之后,选择器将会被重置,并将重复这个过程。

表 4-3 中的常量值代表了由 class java.nio.channels.SelectionKey 定义的位模式。这些位模式可以组合起来定义一组应用程序正在请求通知的状态变化集。

对于所有传输都共有的用户级别 API ,Netty完全地隐藏了这些 NIO 的内部细节。 图 4-2 展示了该处理流程。

零拷贝

零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速 高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,在像 FTP 或者 HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。

4.3.2 Epoll — 用于 Linux 的本地非阻塞传输4.3.2 Epoll— 用于 Linux 的本地非阻塞传输

正如我们之前所说的,Netty 的 NIO 传输基于 Java 提供的异步/非阻塞网络编程的通用抽象。 虽然这保证了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相应的限制,因为 JDK 为了在所有系统上提供相同的功能,必须做出妥协。

Linux作为高性能网络编程的平台,其重要性与日俱增,这催生了大量先进特性的开发,其中包括epoll——一个高度可扩展的I/O事件通知特性。这个API自Linux内核版本 2.5.44(2002)被 引入,提供了比旧的POSIX select和poll系统调用 1更好的性能,同时现在也是Linux上非阻 塞网络编程的事实标准。Linux JDK NIO API使用了这些epoll调用。

Netty为Linux提供了一组NIO API,它以一种和它本身的设计更加一致的方式使用epoll,并且以一种更加轻量的方式使用中断。如果你的应用程序只运行于Linux系统,那么请考虑利用 这个版本的传输; 你将发现在高负载下它的性能要优于JDK的NIO实现。

这个传输的语义与在图 4-2 所示的完全相同,而且它的用法也是简单直接的。相关示例参照 代码清单 4-4。如果要在那个代码清单中使用 epoll 替代 NIO,只需要将 NioEventLoopGroup 替换为 EpollEventLoopGroup,并且将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class 即可。

4.3.3 OIO — 旧的阻塞 I/O

Netty 的 OIO 传输实现代表了一种折中: 它可以通过常规的传输 API 使用,但是由于它是建立在 java.net 包的阻塞实现之上的,所以它不是异步的。但是,它仍然非常适合于某些用途。

有了这个背景,你可能会想,Netty是如何能够使用和用于异步传输相同的API来支持OIO的呢。 答案就是,Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty 将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式。

这种方式的一个问题是,当一个SocketTimeoutException被抛出时填充栈跟踪所需要的时间,其对于性能来说代价很大。

图 4-3 说明了这个逻辑。

4.3.4 用于 JVM 内部通信的 Local 传输

Netty 提供了一个 Local 传输,用于在同一个 JVM 中运行的客户端和服务器程序之间的异步 通信。同样,这个传输也支持其他传输共同的API。

在这个传输中,和服务器 Channel 相关联的 SocketAddress 并没有绑定物理网络地址; 相反,只要服务器还在运行,它就会被存储在注册表里,并在 Channel 关闭时注销。因为这个 传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,在同一个 JVM 中,客户端希望 连接到使用了这个传输的服务器端时必须使用它。除了这个限制,它的使用方式和其他的传输一模一样。

4.3.5 Embedded 传输

Netty 提供了一种额外的传输,使得你可以将一组 ChannelHandler 作为帮助器类嵌入到 其他的 ChannelHandler 内部。通过这种方式,你将可以扩展一个 ChannelHandler 的功能, 而又不需要修改其内部代码。

不足为奇的是,Embedded 传输的关键是一个具体的 Channel 的实现:EmbeddedChannel。在第 9 章中,我们将详细地讨论如何使用这个类来为 ChannelHandler 的实现创建单元 测试用例。

4.4 传输的用例

既然我们已经详细地了解了所有的传输,那么让我们考虑一下选用一个适用于特定用途的协议的因素吧。正如前面所提到的,并不是所有的传输都支持所有的核心协议。

表 4-4 展示了截止出版时的传输和其所支持的协议。

虽然只有 SCTP 传输有这些特殊要求,但是其他传输可能也有它们自己的配置选项需要考虑。 此外,如果只是为了支持更高的并发连接数,服务器平台可能需要配置得和客户端不一样。

这里是一些你很可能会遇到的用例。

非阻塞代码库——如果你的代码库中没有阻塞调用(或者你能够限制它们的范围),那么在 Linux 上使用 NIO 或者 epoll 始终是个好主意。虽然 NIO/epoll 旨在处理大量的并发连 接,但是在处理较小数目的并发连接时,它也能很好地工作,尤其是考虑到它在连接之 间共享线程的方式。

阻塞代码库——正如我们已经指出的,如果你的代码库严重地依赖于阻塞 I/O,而且你的应 用程序也有一个相应的设计,那么在你尝试将其直接转换为 Netty 的 NIO 传输时,你将可 能会遇到和阻塞操作相关的问题。不要为此而重写你的代码,可以考虑分阶段迁移:先从 OIO 开始,等你的代码修改好之后,再迁移到 NIO(或者使用 epoll,如果你在使用 Linux)。

在同一个 JVM 内部的通信——在同一个 JVM 内部的通信,不需要通过网络暴露服务,是 Local 传输的完美用例。这将消除所有真实网络操作的开销,同时仍然使用你的 Netty 代码 库。如果随后需要通过网络暴露服务,那么你将只需要把传输改为 NIO 或者 OIO 即可。

测试你的 ChannelHandler 实现——如果你想要为自己的 ChannelHandler 实现编 写单元测试,那么请考虑使用 Embedded 传输。这既便于测试你的代码,而又不需要创建大 量的模拟(mock)对象。你的类将仍然符合常规的 API 事件流,保证该 ChannelHandler 在和真实的传输一起使用时能够正确地工作。

表 4-5 总结了我们探讨过的用例。

5. ByteBuf

本章主要内容

本章专门探讨了 Netty 的基于 ByteBuf 的数据容器。我们讨论过的要点有:

使用不同的读索引和写索引来控制数据访问;

使用内存的不同方式——基于字节数组和直接缓冲区;

通过 CompositeByteBuf 生成多个 ByteBuf 的聚合视图;

数据访问方法——搜索、切片以及复制;

读、写、获取和设置 API;

内存分配:ByteBufAllocator 池化和引用计数。

网络数据的基本单位总是字节。Java NIO 提供了 ByteBuffer 作为它 的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。

Netty 的 ByteBuffer 替代品是 ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。

5.1 ByteBuf 的 API

Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interface ByteBufHolder。

下面是一些 ByteBuf API 的优点:

它可以被用户自定义的缓冲区类型扩展;

通过内置的复合缓冲区类型实现了透明的零拷贝;

容量可以按需增长(类似于 JDK 的 StringBuilder);

在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法;

读和写使用了不同的索引;

支持方法的链式调用;

支持引用计数;

支持池化。

5.2 ByteBuf 类——Netty 的数据容器

5.2.1 它是如何工作的

ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增。图 5-1 展示了一个空 ByteBuf 的布局结构和状态。

要了解这些索引两两之间的关系,请考虑一下,如当 readerIndex 达到 和 writerIndex 同样的值时会发生什么。在那时,你将会到达“可以读取的”数据的末尾。就 如同试图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个 IndexOutOf- BoundsException。

名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或 者 get 开头的操作则不会。可以指定 ByteBuf 的最大容量。试图移动写索引(即 writerIndex)超过这个值将会触发一个异常1。(默认的限制是 Integer.MAX_VALUE。)

5.2.2 ByteBuf 的使用模式

1.堆缓冲区

最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。这种模式被称为支撑数组 (backing array),它能在没有使用池化的情况下提供快速的分配和释放。这种方式非常适合于有遗留的数据需要处理的情况,如代码清单5-1 所示。

2.直接缓冲区

直接缓冲区是另外一种 ByteBuf 模式。NIO 在 JDK 1.4 中引入的 ByteBuffer 类允许 JVM 实现通过本地调用来分配内存。

这主要是为了避免在每次调用本地 I/O 操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区,或者从中间缓冲区把内容复制到缓冲区。

ByteBuffer的Javadoc1明确指出:“直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。”这也就解释了为何直接缓冲区对于网络数据传输是理想的选择。

如果你的数据包含在一 个在堆上分配的缓冲区中,事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲 区复制到一个直接缓冲区中。

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。

如果你 正在处理遗留代码,你也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一 次复制,如代码清单 5-2 所示。

显然,与使用支撑数组相比,这涉及的工作更多。因此,如果事先知道容器中的数据将会被作为数组来访问,使用堆内存更合适。

3.复合缓冲区

第三种也是最后一种模式使用的是复合缓冲区,它为多个 ByteBuf 提供一个聚合视图。在 这里可以根据需要添加或者删除 ByteBuf 实例,这是一个 JDK 的 ByteBuffer 实现完全缺失的特性。

Netty 通过一个 ByteBuf 子类——CompositeByteBuf——实现了这个模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。

为了举例说明,让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议 传输的消息。这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序 可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新 的头部。

因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个 完美的选择。它在消除了没必要的复制的同时,暴露了通用的 ByteBuf API。

图 5-2 展示了生成 的消息布局。

警告

CompositeByteBuf中的ByteBuf实例可能同时包含直接内存分配和非直接内存分配。 如果其中只有一个实例,那么对 CompositeByteBuf 上的 hasArray()方法的调用将返回该组 件上的 hasArray()方法的值;否则它将返回 false。

需要注意的是,Netty使用了CompositeByteBuf来优化套接字的I/O操作,尽可能地消除了 由JDK的缓冲区实现所导致的性能以及内存使用率的惩罚。1这种优化发生在Netty的核心代码中, 因此不会被暴露出来,但是你应该知道它所带来的影响。

5.3 字节级操作

ByteBuf 提供了许多超出基本读、写操作的方法用于修改它的数据。在接下来我们将会讨论这些中最重要的部分。

5.3.1 随机访问索引

如同在普通的 Java 字节数组中一样,ByteBuf 的索引是从零开始的:第一个字节的索引是 0,最后一个字节的索引总是 capacity() - 1。

代码清单 5-6 表明,对存储机制的封装使得遍 历 ByteBuf 的内容非常简单。

代码清单 5-6 访问数据

ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i++) { byte b = buffer.getByte(i); System.out.println((char)b); }

需要注意的是,使用那些需要一个索引值参数的方法(的其中)之一来访问数据既不会改变 readerIndex 也不会改变 writerIndex。

5.3.2 顺序访问索引

虽然 ByteBuf 同时具有读索引和写索引,但是 JDK 的 ByteBuffer 却只有一个索引,这 也就是为什么必须调用 flip()方法来在读模式和写模式之间进行切换的原因。图 5-3 展示了 ByteBuf 是如何被它的两个索引划分成 3 个区域的。

5.3.3 可丢弃字节

在图 5-3 中标记为可丢弃字节的分段包含了已经被读过的字节。通过调用 discardRead- Bytes()方法,可以丢弃它们并回收空间。这个分段的初始大小为 0,存储在 readerIndex 中, 会随着 read 操作的执行而增加(get*操作不会移动 readerIndex)。

图 5-4 展示了图 5-3 中所展示的缓冲区上调用discardReadBytes()方法后的结果。可以看 到,可丢弃字节分段中的空间已经变为可写的了。注意,在调用discardReadBytes()之后,对 可写分段的内容并没有任何的保证。因为只是移动了可以读取的字节以及writerIndex,而没有对所有可写入的字节进行擦除写。

虽然你可能会倾向于频繁地调用 discardReadBytes()方法以确保可写分段的最大化,但是 请注意,这将极有可能会导致内存复制,因为可读字节(图中标记为 CONTENT 的部分)必须被移 动到缓冲区的开始位置。我们建议只在有真正需要的时候才这样做,例如,当内存非常宝贵的时候。

5.3.4 可读字节

  • ByteBuf 的可读字节分段存储了实际数据。
  • 新分配的、包装的或者复制的缓冲区的默认的 readerIndex 值为 0。
  • 任何名称以 read 或者 skip 开头的操作都将检索或者跳过位于当前 readerIndex 的数据,并且将它增加已读字节数。
  • 如果被调用的方法需要一个 ByteBuf 参数作为写入的目标,并且没有指定目标索引参数, 那么该目标缓冲区的 writerIndex 也将被增加,例如:

readBytes(ByteBuf dest);

  • 如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据,那么将会引发一个 IndexOutOf- BoundsException。

5.3.5 可写字节

  • 可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。
  • 新分配的缓冲区的 writerIndex 的默认值为 0。
  • 任何名称以 write 开头的操作都将从当前的 writerIndex 处 开始写数据,并将它增加已经写入的字节数。
  • 如果写操作的目标也是 ByteBuf,并且没有指定 源索引的值,则源缓冲区的 readerIndex 也同样会被增加相同的大小。这个调用如下所示:

writeBytes(ByteBuf dest);

  • 如果尝试往目标写入超过目标容量的数据,将会引发一个IndexOutOfBoundException1。

5.3.6 索引管理

JDK 的 InputStream 定义了 mark(int readlimit)和 reset()方法,这些方法分别 被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。

同样,可以通过调用 markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex()来标记和重置 ByteBuf 的 readerIndex 和 writerIndex。这些和 InputStream 上的调用类似,只是没有 readlimit 参数来指定标记什么时候失效。

也可以通过调用 readerIndex(int)或者 writerIndex(int)来将索引移动到指定位置。试 图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException。

可以通过调用 clear()方法来将 readerIndex 和 writerIndex 都设置为 0。注意,这并不会清除内存中的内容。图 5-5(重复上面的图 5-3)展示了它是如何工作的。

和之前一样,ByteBuf 包含 3 个分段。图 5-6 展示了在 clear()方法被调用之后 ByteBuf 的状态。

5.3.7 查找操作

在ByteBuf中有多种可以用来确定指定值的索引的方法。最简单的是使用indexOf()方法。 较复杂的查找可以通过那些需要一个ByteBufProcessor1作为参数的方法达成。这个接口只定 义了一个方法:

boolean process(byte value)

它将检查输入值是否是正在查找的值。 ByteBufProcessor针对一些常见的值定义了许多便利的方法。假设你的应用程序需要和

所谓的包含有以NULL结尾的内容的Flash套接字 2集成。调用 forEachByte(ByteBufProcessor.FIND_NUL)

将简单高效地消费该 Flash 数据,因为在处理期间只会执行较少的边界检查。

5.3.8 派生缓冲区

派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方 法被创建的:

duplicate();

slice();

slice(int, int);

Unpooled.unmodifiableBuffer(...);

order(ByteOrder);

readSlice(int)。

每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记

索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。这使得派生缓冲区的创建成本 是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所 以要小心。

ByteBuf复制

如果需要一个现有缓冲区的真实副本,请使用 copy()或者 copy(int, int)方 法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本。

5.3.9 读/写操作

正如我们所提到过的,有两种类别的读/写操作:

get()和 set()操作,从给定的索引开始,并且保持索引不变;

read()和 write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。

表 5-1 列举了最常用的 get()方法。完整列表请参考对应的 API 文档。

大多数的这些操作都有一个对应的 set()方法。这些方法在表 5-2 中列出。

代码清单 5-12 说明了 get()和 set()方法的用法,表明了它们不会改变读索引和写索引。

现在,让我们研究一下 read()操作,其作用于当前的 readerIndex 或 writerIndex。 这些方法将用于从 ByteBuf 中读取数据,如同它是一个流。表 5-3 展示了最常用的方法。

几乎每个 read()方法都有对应的 write()方法,用于将数据追加到 ByteBuf 中。注意,表 5-4 中所列出的这些方法的参数是需要写入的值,而不是索引值。

5.3.10 更多的操作

表5-5 列举了由ByteBuf提供的其他有用操作。

表 5-5 其他有用的操作

名称 描述

isReadable() 如果至少有一个字节可供读取,则返回 true

isWritable() 如果至少有一个字节可被写入,则返回 true

readableBytes() 返回可被读取的字节数

writableBytes() 返回可被写入的字节数

capacity() 返回 ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直 到达到 maxCapacity()

maxCapacity() 返回 ByteBuf 可以容纳的最大字节数

hasArray() 如果 ByteBuf 由一个字节数组支撑,则返回 true

array() 如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个 UnsupportedOperationException 异常

5.4 ByteBufHolder 接口

我们经常发现,除了实际的数据负载之外,我们还需要存储各种属性值。HTTP 响应便是一 个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。

为了处理这种常见的用例,Netty 提供了 ByteBufHolder。ByteBufHolder 也为 Netty 的 高级特性提供了支持,如缓冲区池化,其中可以从池中借用 ByteBuf,并且在需要时自动释放。

ByteBufHolder 只有几种用于访问底层数据和引用计数的方法 。表 5-6 列出了它们(这里不包括它继承自 ReferenceCounted 的那些方法)。

表5-6 ByteBufHolder的操作

名称 描述

content() 返回由这个 ByteBufHolder 所持有的 ByteBuf

copy() 返回这个 ByteBufHolder 的一个深拷贝,包括一个其所包含的 ByteBuf 的非共享拷贝

duplicate() 返回这个ByteBufHolder的一个浅拷贝,包括一个其所包含的ByteBuf的共享拷贝

5.5 ByteBuf 分配

在这一节中,我们将描述管理 ByteBuf 实例的不同方式。

5.5.1 按需分配:ByteBufAllocator 接口

为了降低分配和释放内存的开销,Netty 通过 interface ByteBufAllocator 实现了 (ByteBuf 的)池化,它可以用来分配我们所描述过的任意类型的 ByteBuf 实例。

使用池化是特定于应用程序的决定,其并不会以任何方式改变 ByteBuf API(的语义)。

表5-7 列出了ByteBufAllocator提供的一些操作。

可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。

代码清单 5-14 说明了这两种方法。

Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... ChannelHandlerContext ctx = ...; ByteBufAllocator allocator2 = ctx.alloc();

Netty提供了两种ByteBufAllocator的实现: PooledByteBufAllocator和UnpooledByteBufAllocator。

前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。

后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。

虽然Netty4.1.x默认使用了PooledByteBufAllocator,但这可以通过ChannelConfig API或者在引导应用程序时指定不同的分配器来更改。

5.5.2 Unpooled 缓冲区

可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提 供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf 实例。表 5-8 列举了这些中最重要的方法。

5.5.3 ByteBufUtil 类

ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。

这些静态方法中最有价值的可能就是 hexdump()方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,打印调试ByteBuf 的内容。

另一个有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用来判断两个 ByteBuf 实例的相等性。

5.6 引用计数

引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。

引用计数对于池化实现(如 PooledByteBufAllocator)来说是至关重要的,它降低了 内存分配的开销。

试图访问一个已经被释放的引用计数的对象,将会导致一个 IllegalReferenceCount- Exception。

注意,一个特定的(ReferenceCounted 的实现)类,可以用它自己的独特方式来定义它 的引用计数规则。例如,我们可以设想一个类,其 release()方法的实现总是将引用计数设为 零,而不用关心它的当前值,从而一次性地使所有的活动引用都失效。

谁负责释放

一般来说,是由最后访问(引用计数)对象的那一方来负责将它释放。在第 6 章中, 我们将会解释这个概念和 ChannelHandler 以及 ChannelPipeline 的相关性。


第 6 章 ChannelHandler 和ChannelPipeline

本章中,我们将专注于 ChannelHandler,它为你的数据处理逻辑提供了载体。

因为ChannelHandler 大量地使用了 ByteBuf,你将开始看到 Netty 的整体架构的各个重要部分最 终走到了一起。

本章主要内容

 ChannelHandler API 和 ChannelPipeline API

 检测资源泄漏

 异常处理

6.1 ChannelHandler 家族

6.1.1 Channel 的生命周期

Interface Channel 定义了一组和 ChannelInboundHandler API 密切相关的简单但 功能强大的状态模型,表 6-1 列出了 Channel 的这 4 个状态。

表6-1 Channel的生命周期状态

状态 描述

ChannelUnregistered Channel 已经被创建,但还未注册到 EventLoop

ChannelRegistered Channel 已经被注册到了 EventLoop

ChannelActive Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了

ChannelInactive Channel 没有连接到远程节点

Channel 的正常生命周期如图 6-1 所示。当这些状态发生改变时,将会生成对应的事件。 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。

6.1.2 ChannelHandler 的生命周期

表 6-2 中列出了 interface ChannelHandler 定义的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些 方法中的每一个都接受一个 ChannelHandlerContext 参数。表6-2 ChannelHandler的生命周期方法

类型 描述

handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用

handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用

exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用

Netty 定义了下面两个重要的 ChannelHandler 子接口:

ChannelInboundHandler——处理入站数据以及各种状态变化;

ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。

6.1.3 ChannelInboundHandler 接口

表 6-3 列出了 interface ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。正如我们前面所提到的,这些 方法和 Channel 的生命周期密切相关。

表6-3 ChannelInboundHandler的方法

类型 描述

channelRegistered 当 Channel已经注册到它的 EventLoop 并且能够处理 I/O 时被调用

channelUnregistered 当 Channel从它的 EventLoop 注销并且无法处理任何 I/O 时被调用

channelActive 当 Channel处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪

channelInactive 当 Channel离开活动状态并且不再连接它的远程节点时被调用

channelReadComplete 当Channel上的一个读操作完成时被调用 1

channelRead 当从 Channel 读取数据时被调用

ChannelWritability- Changed 当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用Channel的isWritable()方法来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config(). setWriteHighWaterMark()和 Channel.config().setWriteLowWater- Mark()方法来设置

userEventTriggered 当 ChannelnboundHandler.fireUserEventTriggered()方法被调 用时被调用,因为一个 POJO 被传经了 ChannelPipeline

当某个 ChannelInboundHandler 的实现重写 channelRead()方法时,它将负责显式地 释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCount- Util.release(),如代码清单 6-1 所示。

/** * 代码清单 6-1 释放消息资源 **/ @Sharable // 扩展了 Channel-InboundHandler- Adapter public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 丢弃已接 收的消息 ReferenceCountUtil.release(msg); } }

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用 SimpleChannelInboundHandler。代码清单 6-2 是代码清单 6-1 的一个变体,说明了这一点。

代码清单 6-2 使用 SimpleChannelInboundHandler

@Sharable public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // No need to do anything special // 不需要任何显式的资源释放 } }

由于 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消 息的引用供将来使用,因为这些引用都将会失效。

6.1.4 ChannelOutboundHandler 接口

出站操作和数据将由 ChannelOutboundHandler 处理。它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲 刷操作并在稍后继续。

表 6-4 显示了所有由 ChannelOutboundHandler 本身所定义的方法(忽略了那些从 Channel- Handler 继承的方法)。

表6-4 ChannelOutboundHandler的方法

类型 描述

bind(ChannelHandlerContext,SocketAddress,ChannelPromise) 当请求将 Channel 绑定到本地地址时被调用

connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) 当请求将 Channel 连接到远程节点时被调用

disconnect(ChannelHandlerContext,ChannelPromise) 当请求将 Channel 从远程节点断开时被调用

close(ChannelHandlerContext,ChannelPromise) 当请求关闭 Channel 时被调用

deregister(ChannelHandlerContext,ChannelPromise) 当请求将 Channel 从它的 EventLoop 注销 时被调用

read(ChannelHandlerContext) 当请求从 Channel 读取更多的数据时被调用

flush(ChannelHandlerContext) 当请求通过 Channel 将入队数据冲刷到远程节点时被调用

write(ChannelHandlerContext,Object,ChannelPromise) 当请求通过 Channel 将数据写到远程节点时 被调用

ChannelPromise与ChannelFuture ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个 子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不 可变 (这里借鉴的是 Scala 的 Promise 和 Future 的设计,当一个 Promise 被完成之后,其对应的 Future 的值便 不能再进行任何修改了。)。

6.1.5 ChannelHandler 适配器

你可以使用 ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类作为自己的 ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler 的方法。生成的类的层次结构如图 6-2 所示。

在 ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到 了 ChannelPipeline 中的下一个 ChannelHandler 中。你要想在自己的 ChannelHandler 中使用这些适配器类,只需要简单地扩展它们,并且重 写那些你想要自定义的方法。

6.1.6 资源管理

每当通过调用 ChannelInboundHandler.channelRead()或者 ChannelOutbound- Handler.write()方法来处理数据时,你都需要确保没有任何的资源泄漏。你可能还记得在前面的章节中所提到的,Netty 使用引用计数来处理池化的 ByteBuf。所以在完全使用完某个 ByteBuf 后,调整其引用计数是很重要的。

为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector1, 它将对你应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。相关的开销是非常小的。

如果检测到了内存泄露,将会产生类似于下面的日志消息:

LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=ADVANCED' or call ResourceLeakDetector.setLevel().

Netty 目前定义了 4 种泄漏检测级别,如表 6-5 所示。

表 6-5 泄漏检测级别

级别 描述

DISABLED 禁用泄漏检测。只有在详尽的测试之后才应设置为这个值

SIMPLE 使用 1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况

ADVANCED 使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

PARANOID 类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很 大的影响,应该只在调试阶段使用

泄露检测级别可以通过将下面的 Java 系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel=ADVANCED

实现 ChannelInboundHandler.channelRead()和 ChannelOutboundHandler.write() 方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的 channelRead()操作直接消费 入站消息的情况;也就是说,它不会通过调用 ChannelHandlerContext.fireChannelRead() 方法将入站消息转发给下一个 ChannelInboundHandler。代码清单 6-3 展示了如何释放消息。

代码清单 6-3 消费并释放入站消息

@Sharable public class DiscardInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }

消费入站消息的简单方式

由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被 称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消 息被 channelRead0()方法消费之后自动释放消息。

在出站方向这边,如果你处理了 write()操作并丢弃了一个消息,那么你也应该负责释放 它。代码清单 6-4 展示了一个丢弃所有的写入数据的实现。

代码清单 6-4 丢弃并释放出站消息

@Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); promise.setSuccess(); } }

重要的是,不仅要释放资源,还要通知 ChannelPromise。否则可能会出现 Channel- FutureListener 收不到某个消息已经被处理了的通知的情况。

总之,如果一个消息被消费或者丢弃了,并且没有传递给 ChannelPipeline 中的下一个 ChannelOutboundHandler,那么用户就有责任调用 ReferenceCountUtil.release()。 如果消息到达了实际的传输层,那么当它被写入时或者 Channel 关闭时,都将被自动释放。

6.2 ChannelPipeline 接口

每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性 的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件 的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

图 6-3 展示了一个典型的同时具有入站和出站 ChannelHandler 的 ChannelPipeline 的布 局,并且印证了我们之前的关于 ChannelPipeline 主要由一系列的 ChannelHandler 所组成的 说 法 。C h a n n e l P i p e l i n e 还 提 供 了 通 过 C h a n n e l P i p e l i n e 本 身 传 播 事 件 的 方 法 。如 果 一 个 入 站 事件被触发,它将被从 ChannelPipeline 的头部开始一直被传播到 Channel Pipeline 的尾端。

ChannelPipeline 相对论

你可能会说,从事件途经 ChannelPipeline 的角度来看,ChannelPipeline 的头部和尾端取决于该事件是入站的还是出站的。然而 Netty 总是将 ChannelPipeline 的入站口(图 6-3 中的左侧) 作为头部,而将出站口(该图的右侧)作为尾端。

在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 Channel- Handler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutbound-

Handler 接口。)

6.2.1 修改 ChannelPipeline

ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline 的布局。

表6-6 ChannelHandler的用于修改ChannelPipeline的方法

名称 描述

AddFirstaddBefore 将一个ChannelHandler添加到ChannelPipeline中

addAfteraddLast

remove 将一个 ChannelHandler 从 ChannelPipeline 中移除

replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler

除了这些操作,还有别的通过类型或者名称来访问 ChannelHandler 的方法。这些方法都 列在了表 6-7 中。

表6-7 ChannelPipeline的用于访问ChannelHandler的操作

名称 描述

get 通过类型或者名称返回 ChannelHandler

context 返回和 ChannelHandler 绑定的 ChannelHandlerContext

names 返回 ChannelPipeline 中所有 ChannelHandler 的名称

6.2.2 触发事件

ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。表 6-8 列出了入 站操作,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件。

在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。表 6-9 列出了 ChannelPipeline API 的出站操作。

总结一下:

ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler;

ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改;

ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。

6.3 ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关 联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandler- Context。ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在 同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这 些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

表 6-10 对 ChannelHandlerContext API 进行了总结。

当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:

ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;

如同我们在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

6.3.1 使用 ChannelHandlerContext

在这一节中我们将讨论 ChannelHandlerContext 的用法,以及存在于 ChannelHandler- Context、Channel 和 ChannelPipeline 上的方法的行为。图 6-4 展示了它们之间的关系。

6.3.2 ChannelHandler 和 ChannelHandlerContext 的高级用法

正如我们在代码清单 6-6 中所看到的,你可以通过调用 ChannelHandlerContext 上的 pipeline()方法来获得被封闭的 ChannelPipeline 的引用。这使得运行时得以操作 ChannelPipeline 的 ChannelHandler,我们可以利用这一点来实现一些复杂的设计。例如, 你可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换。

另一种高级的用法是缓存到 ChannelHandlerContext 的引用以供稍后使用,这可能会发 生在任何的 ChannelHandler 方法之外,甚至来自于不同的线程。代码清单 6-9 展示了用这种 模式来触发事件。

代码清单 6-9 缓存到 ChannelHandlerContext 的引用

public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; } public void send(String msg) { ctx.writeAndFlush(msg); } }

6.4 异常处理

异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现。因此,Netty 提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。

6.4.1 处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler 里被触发的那一点开始流经 ChannelPipeline。要想处理这种类型的入站异常,你需要在你 的 ChannelInboundHandler 实现中重写下面的方法。

public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception

代码清单 6-12 展示了一个简单的示例,其关闭了 Channel 并打印了异常的栈跟踪信息。 代码清单 6-12 基本的入站异常处理

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻 辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站 异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。

你应该如何响应异常,可能很大程度上取决于你的应用程序。你可能想要关闭Channel(和 连接),也可 能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑(或者没有消费该异常), 那么Netty将会记录该异常没有被处理的事实 1。

总结一下:

ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline 中的下一个 ChannelHandler;

如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;

要想定义自定义的处理逻辑,你需要重写 exceptionCaught()方法。然后你需要决定

是否需要将该异常传播出去。

6.4.2 处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。

每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。

几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise 的实例。作为 ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:

ChannelPromise setSuccess(); ChannelPromise setFailure(Throwable cause);

ChannelPromise 的可写方法

通过调用 ChannelPromise 上的 setSuccess()和 setFailure()方法,可以使一个操作的状态在 ChannelHandler 的方法返回给其调用者时便即刻被感知到。

添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是

/** * 代码清单 6-13 添加 ChannelFutureListener 到 ChannelFuture */ ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } }); }

第二种方式是将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法的 ChannelPromise。代码清单 6-14 中所展示的代码和代码清单 6-13 中所展示的具有相同的效果。

/** * 代码清单 6-14 添加 ChannelFutureListener 到 ChannelPromise */ public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } }); } } }


第 7 章 EventLoop和线程模型

Netty 的 EventLoop 和并发模型,对于理解 Netty 是如何实现异步的、事件驱动的网络编程模型来说至关重要。

本章主要内容

 线程模型概述

 事件循环的概念和实现

 任务调度

 实现细节

如果你对 Java 的并发 API(java.util.concurrent)有比较好的理解,那么你应该会发 现在本章中的讨论都是直截了当的。如果这些概念对你来说还比较陌生,或者你需要更新自己的 相关知识,那么由 Brian Goetz 等编写的《Java 并发编程实战》 (Addison-Wesley Professional, 2006)这本书将是极好的资源。

7.1 线程模型概述

在这一节中,我们将介绍常见的线程模型,随后将继续讨论 Netty 过去以及当前的线程模型, 并评审它们各自的优点以及局限性。

因为具有多核心或多个 CPU 的计算机现在已经司空见惯,大多数的现代应用程序都利用了 复杂的多线程处理技术以有效地利用系统资源。

相比之下,在早期的 Java 语言中,我们使用多 线程处理的主要方式无非是按需创建和启动新的 Thread 来执行并发的任务单元——一种在高 负载下工作得很差的原始方式。Java 5 随后引入了 Executor API,其线程池通过缓存和重用 Thread 极大地提高了性能。

基本的线程池化模式可以描述为:

从池的空闲线程列表中选择一个 Thread,并且指派它去运行一个已提交的任务(一个Runnable 的实现);

当任务完成时,将该 Thread 返回给该列表,使其可被重用。 图 7-1 说明了这个模式。

虽然池化和重用线程相对于简单地为每个任务都创建和销毁线程是一种进步,但是它并不能 消除由上下文切换所带来的开销,其将随着线程数量的增加很快变得明显,并且在高负载下愈演 愈烈。此外,仅仅由于应用程序的整体复杂性或者并发需求,在项目的生命周期内也可能会出现 其他和线程相关的问题。

简而言之,多线程处理是很复杂的,我们来看看 Netty 是如何帮助简 化它的。

7.2 EventLoop 接口

EventLoop通常被称为事件循环,代码清单 7-1 中说明了事件循环的基本思想,其中每个任务都是一个 Runnable 的实例(如图 7-1 所示)。

/** * 代码清单 7-1 在事件循环中执行任务 */ while (!terminated) { // 阻塞,直到有事件已经就绪可被运行 List<Runnable> readyEvents = blockUntilEventsReady(); for (Runnable ev: readyEvents) { // 循环遍历,并处理所有的事件 ev.run(); } }

Netty 的 EventLoop 是协同设计的一部分,它采用了两个基本的 API:并发和网络编程。

首先,io.netty.util.concurrent 包构建在 JDK 的 java.util.concurrent 包上,用 来提供线程执行器。

其次,io.netty.channel 包中的类,为了与 Channel 的事件进行交互, 扩展了这些接口/类。图 7-2 展示了生成的类层次结构。

在这个模型中,一个 EventLoop 将由一个永远都不会改变的 Thread 驱动,同时任务 (Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。 根据配置和可用核心的不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel。

事件/任务的执行顺序

事件和任务是以先进先出(FIFO)的顺序执行的。这样可以通过保证字 节内容总是按正确的顺序被处理,消除潜在的数据损坏的可能性。

7.2.1 Netty 4 中的 I/O 和事件处理

正如我们在第 6 章中所详细描述的,由 I/O 操作触发的事件将流经安装了一个或者多个 ChannelHandler 的 ChannelPipeline。

事件的性质通常决定了它将被如何处理;它可能将数据从网络栈中传递到你的应用程序中, 或者进行逆向操作,或者 执行一些截然不同的操作。但是事件的处理逻辑必须足够的通用和灵活, 以处理所有可能的用例。因此,在Netty 4 中,所有的I/O操作和事件都由已经被分配给了 EventLoop的那个Thread来处理。

这不同于 Netty 3 中所使用的模型。在下一节中,我们将讨论这个早期的模型以及它被替换 的原因。

7.2.2 Netty 3 中的 I/O 操作

在以前的版本中所使用的线程模型只保证了入站(之前称为上游)事件会在所谓的 I/O 线程中执行。所有的出站(下游)事件都由调用线程处理,其可 能是 I/O 线程也可能是别的线程。开始看起来这似乎是个好主意,但是已经被发现是有问题的, 因为需要在 ChannelHandler 中对出站事件进行仔细的同步。简而言之,不可能保证多个线程 不会在同一时刻尝试访问出站事件。例如,如果你通过在不同的线程中调用 Channel.write()

方法,针对同一个 Channel 同时触发出站的事件,就会发生这种情况。

Netty 4 中所采用的线程模型,通过在同一个线程中处理某个给定的 EventLoop 中所产生的所有事件,解决了这个问题。这提供了一个更加简单的执行体系架构,并且消除了在多个 ChannelHandler 中进行同步的需要(除了任何可能需要在多个 Channel 中共享的)。

7.3 任务调度

7.3.1 JDK 的任务调度 API

在 Java 5 之前,任务调度是建立在 java.util.Timer 类之上的,其使用了一个后台 Thread, 并且具有与标准线程相同的限制。随后,JDK 提供了 java.util.concurrent 包,它定义了 interface ScheduledExecutorService。表 7-1 展示了 java.util.concurrent.Executors 的相关工厂方法。

表7-1 java.util.concurrent.Executors类的工厂方法

方法

描述

newScheduledThreadPool(int corePoolSize)

创建一个 ScheduledThreadExecutorService, 用于调度命令在指定延迟之后运行或者周期性地执 行。它使用 corePoolSize 参数来计算线程数

newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

newSingleThreadScheduledExecutor()

创建一个 ScheduledThreadExecutorService, 用于调度命令在指定延迟之后运行或者周期性地执 行。它使用一个线程来执行被调度的任务

newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

虽然选择不是很多,但是这些预置的实现已经足以应对大多数的用例。虽然 ScheduledExecutorService API 是直截了当的,但是在高负载下它将带来性能上 的负担。

7.3.2 使用 EventLoop 调度任务

ScheduledExecutorService 的实现具有局限性,例如,事实上作为线程池管理的一部 分,将会有额外的线程创建。如果有大量任务被紧凑地调度,那么这将成为一个瓶颈。Netty 通 过 Channel 的 EventLoop 实现任务调度解决了这一问题,如代码清单 7-3 所示。

代码清单 7-3 使用 EventLoop 调度任务

Channel ch = ... ScheduledFuture<?> future = ch.eventLoop().schedule( new Runnable() { @Override public void run() { System.out.println("60 seconds later"); } }, 60, TimeUnit.SECONDS);

经过 60 秒之后,Runnable 实例将由分配给 Channel 的 EventLoop 执行。如果要调度任务以每隔 60 秒执行一次,请使用 scheduleAtFixedRate()方法。

7.4 实现细节

7.4.1 线程管理

Netty线程模型的卓越性能取决于对于当前执行的Thread的身份的确定 2,也就是说,确定它是支撑 EventLoop 的那一个线程。(回想一下EventLoop将负责处理一个Channel的整个生命周期内的所有事件。)

如果是,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务放入到内部队列中,以便稍后执行。当 EventLoop 处理事件时,它会执行队列中的那些任务/事件。这也就解释了任何的 Thread 是如何 与 Channel 直接交互而无需在 ChannelHandler 中进行额外同步的。

注意,每个 EventLoop 都有它自已的任务队列,独立于任何其他的 EventLoop。图 7-3 展示了 EventLoop 用于调度任务的执行逻辑。这是 Netty 线程模型的关键组成部分。

EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中, 使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。(这一点在将来的版本中可能会改变。)

一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个 EventLoop(以及相关联的 Thread)。请牢记这一点,因为它可以使你从担忧你的 ChannelHandler 实现中的线程安全和同步问题中解脱出来。

另外,需要注意的是,EventLoop 的分配方式对 ThreadLocal 使用的影响。因为一个 EventLoop 通常会被用于支撑多个 Channel,所以对于所有相关联的 Channel 来说, ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而, 在一些无状态的上下文中,它仍然可以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。

2.阻塞传输

用于像 OIO(旧的阻塞 I/O)这样的其他传输的设计略有不同。这里每一个 Channel 都将被分配给一个 EventLoop,以及它的 Thread。如果你开发的 应用程序使用过 java.io 包中的阻塞 I/O 实现,你可能就遇到过这种模型,如图 7-5 所示。

但是,正如同之前一样,得到的保证是每个 Channel 的 I/O 事件都将只会被一个 Thread 处理。这也是另一个 Netty 设计一致性的例子,这种设计对 Netty 的可靠性和易用性做出了巨大贡献。

第8章 引导

在深入地学习了 ChannelPipeline、ChannelHandler 和 EventLoop 之后,你接下来 的问题可能是:“如何将这些部分组织起来,成为一个可实际运行的应用程序呢?”

答案是?“引导”(Bootstrapping)。简单来说,引导一个应用程序是指对它进行配置,并使它运 行起来的过程。

本章主要内容

 引导客户端和服务器

 从Channel内引导客户端

 添加ChannelHandler

 使用ChannelOption和属性

8.1 Bootstrap 类

引导类的层次结构包括一个抽象的父类和两个具体的引导子类,如图 8-1 所示。

我们分别来看作用于服务器和客户端的引导的目标。

服务器致力于使用一个父 Channel 来接受 来自客户端的连接,并创建子 Channel 以用于它们之间的通信;

而客户端将最可能只需要一个 单独的、没有父 Channel 的 Channel 来用于所有的网络交互。(正如同我们将要看到的,这也 适用于无连接的传输协议,如 UDP,因为它们并不是每个连接都需要一个单独的 Channel。)

两种应用程序类型之间通用的引导步骤由 AbstractBootstrap 处理,而特定于客户端或者服务器的引导步骤则分别由 Bootstrap 或 ServerBootstrap 处理。

AbstractBootstrap 类的完整声明是:

public abstract class AbstractBootstrap

, C extends Channel>

其子类的声明如下:

public class Bootstrap

extends AbstractBootstrap<bootstrap, channel="">

public class ServerBootstrap

extends AbstractBootstrap<serverbootstrap, serverchannel="">

在AbstractBootstrap签名中,子类型 B 是其父类型的一个类型参数,因此可以返回到运行时实例的引用以

支持方法的链式调用(也就是所谓的流式语法)。

为什么引导类是 Cloneable 的

你有时可能会需要创建多个具有类似配置或者完全相同配置的Channel。为了支持这种模式而又不

需要为每个Channel都创建并配置一个新的引导类实例,AbstractBootstrap被标记为了 Cloneable1。在一个已经配置完成的引导类实例上调用clone()方法将返回另一个可以立即使用的引 导类实例。

注意,这种方式只会创建引导类实例的EventLoopGroup的一个浅拷贝,所以,被浅拷贝的EventLoopGroup 将在所有克 隆的Channel实例之间共享。这是可以接受的,因为通常这些克隆的Channel的生命周期都很短暂,一 个典型的场景是——创建一个Channel以进行一次HTTP请求。

8.2 引导客户端和无连接协议

Bootstrap 类被用于客户端或者使用了无连接协议的应用程序中。表 8-1 提供了该类的一个概览,其中许多方法都继承自 AbstractBootstrap 类。

8.2.1 引导客户端

Bootstrap 类负责为客户端和使用无连接协议的应用程序创建 Channel,如图 8-2 所示。

代码清单 8-1 中的代码引导了一个使用 NIO TCP 传输的客户端。

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个Bootstrap类的实例 以创建和 连接新的 客户端 Channe Bootstrap bootstrap = new Bootstrap(); // 设置 EventLoopGroup, 提供用于处理 Channel 事件的 EventLoop bootstrap.group(group) // 指定要使用的 Channel 实现 .channel(NioSocketChannel.class) // 设置用于 Channel 事件和数 据的 ChannelInboundHandler .handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channeRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); // 连接到远程 主机 ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Connection established"); } else { System.err.println("Connection attempt failed"); } } } );

这个示例使用了前面提到的流式语法;这些方法(除了 connect()方法以外)将通过每次 方法调用所返回的对 Bootstrap 实例的引用链接在一起。

8.2.2 Channel 和 EventLoopGroup 的兼容性

代码清单 8-2 所示的目录清单来自 io.netty.channel 包。你可以从包名以及与其相对应 的类名的前缀看到,对于 NIO 以及 OIO 传输两者来说,都有相关的 EventLoopGroup 和 Channel 实现。

必须保持这种兼容性,不能混用具有不同前缀的组件,如 NioEventLoopGroup 和 OioSocketChannel。否则将会导致 IllegalStateException,因为它混用了不兼容的传输。

Exception in thread "main" java.lang.IllegalStateException: incompatible event loop type: io.netty.channel.nio.NioEventLoop at io.netty.channel.AbstractChannel$AbstractUnsafe.register( AbstractChannel.java:571)

关于 IllegalStateException 的更多讨论

在引导的过程中,在调用 bind()或者 connect()方法之前,必须调用以下方法来设置所需的组件:

 group();

 channel()或者 channelFactory();

 handler()。

如果不这样做,则将会导致 IllegalStateException。对 handler()方法的调用尤其重要,因为它需要配置好 ChannelPipeline。

8.3 引导服务器

我们将从 ServerBootstrap API 的概要视图开始我们对服务器引导过程的概述。然后, 我们将会探讨引导服务器过程中所涉及的几个步骤,以及几个相关的主题,包含从一个 ServerChannel 的子 Channel 中引导一个客户端这样的特殊情况。

8.3.1 ServerBootstrap 类

表 8-2 列出了 ServerBootstrap 类的方法。

8.3.2 引导服务器

你可能已经注意到了,表 8-2 中列出了一些在表 8-1 中不存在的方法:childHandler()、 childAttr()和 childOption()。这些调用支持特别用于服务器应用程序的操作。具体来说, ServerChannel 的实现负责创建子 Channel,这些子 Channel 代表了已被接受的连接。

因此,负责引导 ServerChannel 的 ServerBootstrap 提供了这些方法,以简化将设置应用到 已被接受的子 Channel 的 ChannelConfig 的任务。

图 8-3 展示了 ServerBootstrap 在 bind()方法被调用时创建了一个 ServerChannel, 并且该 ServerChannel 管理了多个子 Channel。

代码清单 8-4 中的代码实现了图 8-3 中所展示的服务器的引导过程。

/** * 代码清单 8-4 引导服务器 创建 ServerBootstrap */ NioEventLoopGroup group = new NioEventLoopGroup(); // 创建 ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); // 设置 EventLoopGroup,其提供了用 于处理 Channel 事件的 EventLoop bootstrap.group(group) // 指定要使用的 Channel 实现 .channel(NioServerSocketChannel.class) // 设置用于处理已被接受 的子 Channel 的 I/O 及数据的 ChannelInbound- Handler .childHandler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); // 通过配置好的ServerBootstrap的实例绑定该Channel ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("Bound attempt failed"); channelFuture.cause().printStackTrace(); } } } );

8.4 从 Channel 引导客户端

假设你的服务器正在处理一个客户端的请求,这个请求需要它充当第三方系统的客户端。在这种情况下,将需要从已经被接受的子 Channel 中引导一个客户 端 Channel。

你可以按照 8.2.1 节中所描述的方式创建新的 Bootstrap 实例,但是这并不是最高效的解 决方案,因为它将要求你为每个新创建的客户端 Channel 定义另一个 EventLoop。这会产生 额外的线程,以及在已被接受的子 Channel 和客户端 Channel 之间交换数据时不可避免的上 下文切换。

一个更好的解决方案是:通过将已被接受的子 Channel 的 EventLoop 传递给 Bootstrap 的 group()方法来共享该 EventLoop。因为分配给 EventLoop 的所有 Channel 都使用同一 个线程,所以这避免了额外的线程创建,以及前面所提到的相关的上下文切换。这个共享的解决 方案如图 8-4 所示。

实现 EventLoop 共享涉及通过调用 group()方法来设置 EventLoop,如代码清单 8-5 所示。

/** * 代码清单 8-5 引导服务器 */ // 创建 ServerBootstrap 以创建 ServerSocketChannel,并绑定它 ServerBootstrap bootstrap = new ServerBootstrap(); // 设置 EventLoopGroup,其将提供用 以处理 Channel 事件的 EventLoop bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) // 指定要使用的 Channel 实现 .channel(NioServerSocketChannel.class) // 设置用于处理已被接受的 子 Channel 的 I/O 和数据的 ChannelInboundHandler .childHandler(new SimpleChannelInboundHandler<ByteBuf>() { ChannelFuture connectFuture; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 创建一个 Bootstrap 类的实例以连接 到远程主机 Bootstrap bootstrap = new Bootstrap(); // 指定 Channel 的实现 bootstrap.channel(NioSocketChannel.class).handler( // 为入站 I/O 设置 ChannelInboundHandler new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { System.out.println("Received data"); } }); // 使用与分配给 已被接受的子 Channel 相同的 EventLoop bootstrap.group(ctx.channel().eventLoop()); // 连接到 远程节点 connectFuture = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); } @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { if (connectFuture.isDone()) { // 当连接完成时,执行一 些数据操作(如代理) // do something with the data } } }); // 通过配置好的ServerBootstrap 绑定该 Server- SocketChannel ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("Bind attempt failed"); channelFuture.cause().printStackTrace(); } } });

我们在这一节中所讨论的主题以及所提出的解决方案都反映了编写 Netty 应用程序的一个一 般准则:尽可能地重用 EventLoop,以减少线程创建所带来的开销。

8.5 在引导过程中添加多个 ChannelHandler

在所有我们展示过的代码示例中,我们都在引导的过程中调用了 handler()或者 child-Handler()方法来添加单个的 ChannelHandler。

这对于简单的应用程序来说可能已经足够了,但是它不能满足更加复杂的需求。例如,一个必须要支持多种协议的应用程序将会有很多的ChannelHandler,而不会是一个庞大而又笨重的类。

正如你经常所看到的一样,你可以根据需要,通过在 ChannelPipeline 中将它们链接在一起来 部署尽可能多的ChannelHandler。但是,如果在引导的过程中你只能设置一个 ChannelHandler, 那么你应该怎么做到这一点呢?

正是针对于这个用例,Netty 提供了一个特殊的 ChannelInboundHandlerAdapter 子类:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

它定义了下面的方法:

protected abstract void initChannel(C ch) throws Exception;

这个方法提供了一种将多个 ChannelHandler 添加到一个 ChannelPipeline 中的简便 方法。你只需要简单地向 Bootstrap 或 ServerBootstrap 的实例提供你的 Channel- Initializer 实现即可,并且一旦 Channel 被注册到了它的 EventLoop 之后,就会调用你的 initChannel()版本。在该方法返回之后,ChannelInitializer 的实例将会从 Channel- Pipeline 中移除它自己。

代 码 清 单 8-6 定 义 了 ChannelInitializerImpl 类 , 并 通 过 ServerBootstrap 的 childHandler()方法注册它 1。你可以看到,这个看似复杂的操作实际上是相当简单直接的。

/** * 引导和使用 ChannelInitializer * @throws Exception */ public void start2() throws Exception { // 创建 ServerBootstrap 以创 建和绑定新的 Channel ServerBootstrap bootstrap = new ServerBootstrap(); // 设置 EventLoopGroup,其将提供用 以处理 Channel 事件的 EventLoop bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) // 指定 Channel 的 实现 .channel(NioServerSocketChannel.class) // 注册一个 ChannelInitializerImpl 的 实例来设置 ChannelPipeline .childHandler(new ChannelInitializerImpl()); ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.sync(); } // 用以设置 ChannelPipeline 的自 定义 ChannelInitializerImpl 实现 final class ChannelInitializerImpl extends ChannelInitializer<Channel> { // 将所需的 ChannelHandler 添加到 ChannelPipeline @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); } }

如果你的应用程序使用了多个 ChannelHandler,请定义你自己的 ChannelInitializer 实现来将它们安装到 ChannelPipeline 中。

8.6 使用 Netty 的 ChannelOption 和属性

在每个 Channel 创建时都手动配置它可能会变得相当乏味。幸运的是,你不必这样做。相 反,你可以使用 option()方法来将 ChannelOption 应用到引导。你所提供的值将会被自动 应用到引导所创建的所有 Channel。可用的 ChannelOption 包括了底层连接的详细信息,如 keep-alive 或者超时属性以及缓冲区设置。

Netty 应用程序通常与组织的专有软件集成在一起,而像 Channel 这样的组件可能甚至会在 正常的 Netty 生命周期之外被使用。在某些常用的属性和数据不可用时,Netty 提供了 AttributeMap 抽象(一个由 Channel 和引导类提供的集合)以及 AttributeKey(一 个用于插入和获取属性值的泛型类)。使用这些工具,便可以安全地将任何类型的数据项与客户 端和服务器 Channel(包含 ServerChannel 的子 Channel)相关联了。

代码清单 8-7 展示了可以如何使用 ChannelOption 来配置 Channel,以及如果使用属性 来存储整型值。

// 创建一个 AttributeKey 以标识该属性 final AttributeKey<Integer> id = new AttributeKey<Integer>("ID"); Bootstrap bootstrap = new Bootstrap(); // 设置 EventLoopGroup,其 提供了用以处理 Channel 事件的 EventLoop bootstrap.group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler( new SimpleChannelInboundHandler<ByteBuf>() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // 使用 AttributeKey 检索 属性以及它的值 Integer idValue = ctx.channel().attr(id).get(); // do something with the idValue } @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); // 设置 ChannelOption, 其将在 connect()或者 bind()方法被调用时 被设置到已经创建的 Channel 上 bootstrap.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 存储该 id 属性 bootstrap.attr(id, 123456); ChannelFuture future = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); future.syncUninterruptibly();

8.7 引导 DatagramChannel

前面的引导代码示例使用的都是基于 TCP 协议的 SocketChannel,但是 Bootstrap 类 也可以被用于无连接的协议。为此,Netty 提供了各种 DatagramChannel 的实现。唯一区别就 是,不再调用 connect()方法,而是只调用 bind()方法,如代码清单 8-8 所示。

Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new OioEventLoopGroup()) // 指定Channel 的实现 .channel(OioDatagramChannel.class) .handler(new SimpleChannelInboundHandler<DatagramPacket>() { @Override public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { // Do something with the packet } } ); // 调用 bind()方 法,因为该协 议是无连接的 ChannelFuture future = bootstrap.bind(new InetSocketAddress(0)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Channel bound"); } else { System.err.println("Bind attempt failed"); channelFuture.cause().printStackTrace(); } } });

8.8 关闭

引导使你的应用程序启动并且运行起来,但是迟早你都需要优雅地将它关闭。当然,你也可 以让 JVM 在退出时处理好一切,但是这不符合优雅的定义,优雅是指干净地释放资源。

最重要的是,你需要关闭 EventLoopGroup,它将处理任何挂起的事件和任务,并且随后 释放所有活动的线程。这就是调用 EventLoopGroup.shutdownGracefully()方法的作用。 这个方法调用将会返回一个 Future,这个 Future 将在关闭完成时接收到通知

需要注意的是, shutdownGracefully()方法也是一个异步的操作,所以你需要阻塞等待直到它完成,或者向 所返回的 Future 注册一个监听器以在关闭完成时获得通知。代码清单 8-9 符合优雅关闭的定义。

/** * 代码清单 8-9 优雅关闭 */ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class); ... // shutdownGracefully()方法将释放 所有的资源,并且关闭所有的当 前正在使用中的 Channel Future<?> future = group.shutdownGracefully(); // block until the group has shutdown future.syncUninterruptibly();

或者,你也可以在调用 EventLoopGroup.shutdownGracefully()方法之前,显式地 在所有活动的 Channel 上调用 Channel.close()方法。但是在任何情况下,都请记得关闭 EventLoopGroup 本身。