简单找了下发现网上没有关于 Netty3 比较完整的源码解析的文章,于是我就去读官方文档,为了加强记忆,翻译成了中文,有适当的简化。
原文档地址:Netty3 文档
Chapter 1 开始
1、开始之前
运行 demo 的前提有两个:最新版本的 Netty3 和 JDK1.5 以上
2、写一个 Discard Server
最简单的协议就是 Discard 协议——忽略所有接收到的数据并且不作任何响应。我们从 Netty 处理 I / O 事件的 handler 实现开始:
public class DiscardServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
DiscardServerHandler 继承 SimpleChannelHandler——ChannelHandler 的一个实现;
messageReceived 方法接收 MessageEvent 类型的参数,它包含接收的客户端数据;
exceptionCaught 方法在出现 I / O 错误或者处理事件时抛出错误时被调用,通常包含记录错误信息和关闭通道的动作;
接下来写一个 main 方法来开启使用 DiscardServerHandler 的服务:
public class DiscardServer {
public static void main(String[] args) throws Exception {
ChannelFactory factory =
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(new DiscardServerHandler());
}
});
bootstrap.setOption(“child.tcpNoDelay”, true);
bootstrap.setOption(“child.keepAlive”, true);
bootstrap.bind(new InetSocketAddress(8080));
}
}
ChannelFactory 是创建和管理 Channel 及其关联资源的工厂,它负责处理所有 I / O 请求并且执行 I / O 生成 ChannelEvent。但是它不是自己创建 I / O 线程,而是从调用构造方法时指定的线程池中获取线程。服务端应用使用 NioServerSocketChannelFactory;
ServerBootstrap 是一个设置服务端的帮助类;
当服务端接收到一个新的连接,指定的 ChannelPipelineFactory 就会创建一个新的 ChannelPipeline,这个新的 Pipeline 包含一个 DiscardServerHandler 对象;
你可以给 Channel 实现设置具体的参数,选项带 ”child.” 前缀代表应用在接收到的 Channel 上而不是服务端本身的 ServerSocketChannel;
剩下的就是绑定端口启动服务,可以绑定多个不同的端口。
3、研究接收到的数据
我们可以通过 ”telnet localhost 8080″ 命令去测试服务,但因为是 Discard 服务,我们都不知道服务是否正常工作。所以我们修改下服务,让它打印出接收到的数据。
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
while(buf.readable()) {
System.out.println((char) buf.readByte());
System.out.flush();
}
}
ChannelBuffer 是 Netty 基本的存储字节的数据结构,跟 NIO 的 ByteBuffer 类似,但是更容易使用更灵活。比如 Netty 允许你在尽量少的内存复制次数的情况下把多个 ChannelBuffer 组合成一个。
4、写一个 Echo 服务
一个服务通常对请求是有响应的。接下来我们尝试写一个实现 Echo 协议——将接收的数据原路返回给客户端的服务:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
Channel ch = e.getChannel();
ch.write(e.getMessage());
}
MessageEvent 继承了 ChannnelEvent,一个 ChannnelEvent 持有它相关的 Channel 的引用。我们可以获取这个 Channel 然后调用写方法写入数据返回给客户端。
5、写一个时间服务
这次我们实现一个时间协议——在不需要任何请求数据的情况下返回一个 32 位整型数字并且在发送之后关闭连接。因为我们忽略请求数据,只需要在连接建立的发送消息,所以这次不能使用 messageReceived 方法而是重写 channelConnected 方法:
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel ch = e.getChannel();
ChannelBuffer time = ChannelBuffers.buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
ChannelFuture f = ch.write(time);
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
Channel ch = future.getChannel();
ch.close();
}
});
}
channelConnected 方法在连接建立的时间被调用,然后我们写入一个 32 位整型数字代表以秒为单位的当前时间;
我们使用 ChannelBuffers 工具类分配了一个容量为 4 字节的 ChannelBuffer 来存放这个 32 位整型数字;
然后我们把 ChannelBuffer 写入 Channel… 等一下,flip 方法哪里去了?在 NIO 中我们不是要在写入通道前调用 ByteBuffer 的 flip 方法的吗?ChannelBuffer 没有这个方法,因为它有两个指针,一个用于读操作一个用于写操作。当数据写入 ChannelBuffer 时写索引增加而读索引不变。读索引和写索引相互独立。对比之下,Netty 的 ChannelBuffer 比 NIO 的 buffer 更容易使用。
另外需要注意的一点是 ChannelBuffer 的 write 方法返回的是一个 ChannelFuture 对象。它表示一个还未发生的 I / O 操作,因为 Netty 中所有操作都是异步的。所以我们必须在 ChannelFuture 收到操作完成的通知之后才能关闭 Channel。哦,对了,close 方法也是返回 ChannelFuture…
那么问题来了,我们如何得到操作完成的通知呢?只需要简单得向返回的 ChannelFuture 对象中添加一个 ChannelFutureListener,这里我们创建了一个 ChannelFutureListener 的匿名内部类,它在操作完成的时候会关闭 Channel。
6、写一个时间客户端
我们还需要一个遵守时间协议,即能把整型数字翻译成日期的客户端。Netty 服务端和客户端唯一的区别就是要求不同的 Bootstrap 和 ChannelFactory:
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
ChannelFactory factory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(new TimeClientHandler());
}
});
bootstrap.setOption(“tcpNoDelay”, true);
bootstrap.setOption(“keepAlive”, true);
bootstrap.connect(new InetSocketAddress(host, port));
}
NioClientSocketChannelFactory,用来创建一个客户端 Channel;
ClientBootstrap 是 ServerBootStrap 在客户端的对应部分;
需要注意的是设置参数时不需要 ”child.” 前缀,客户端 SocketChannel 没有父 Channel;
对应服务端的 bind 方法,这里我们需要调用 connect 方法。
另外我们需要一个 ChannelHandler 实现,负责把接收到服务端返回的 32 位整型数字翻译成日期并打印出来,然后断开连接:
public class TimeClientHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
long currentTimeMillis = buf.readInt() * 1000L;
System.out.println(new Date(currentTimeMillis));
e.getChannel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
看上去很简单是吧?但是实际运行过程中这个 handler 有时会抛出一个 IndexOutOfBoundsException。下一节我们会讨论为什么会这样。
7、处理基于流的传输
7.1、一个关于 Socket Buffer 的小警告
在像 TCP/IP 那样基于流的传输中,接收数据保存在一个 socket 接收缓存中。但是这个缓存不是一个以包为单位的队列,而是一个以字节为单位的队列。这就意味着,即使发送两个独立的消息,操作系统会把他们视为一个字节串。因此,不能保证你读到的和另一端写入的一样。所以,不管是客户端还是服务端,对于接收到的数据都需要整理成符合应用程序逻辑的结构。
7.2、第一种解决方式
回到前面的时间客户端的问题,32 位整型数字很小,但是它也是可以拆分的,特别是当流量上升的时候,被拆分的可能性也随之上升。一个简单的处理方式就是内部创建一个累计的缓存,直到接收满 4 个字节才进行处理。
private final ChannelBuffer buf = dynamicBuffer();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer m = (ChannelBuffer) e.getMessage();
buf.writeBytes(m);
if (buf.readableBytes() >= 4) {
long currentTimeMillis = buf.readInt() * 1000L;
System.out.println(new Date(currentTimeMillis));
e.getChannel().close();
}
}
ChannelBuffers.dynamicBuffer() 返回一个自动扩容的 ChannelBuffer;
所有接收的数据都累积到这个动态缓存中;
handler 需要检查缓存是否满 4 个字节,是的话才能继续业务逻辑;否则,Netty 会在数据继续到达之后持续调用 messageReceive。
7.3、第二种解决方案
第一种方案有很多问题,比如一个复杂的协议,由多个可变长度的域组成,这种情况下第一种方案的 handler 就无法支持了。你会发现你可以添加多个 ChannelHandler 到 ChannelPipeline 中,利用这个特性,你可以把一个臃肿的 ChannelHandler 拆分到多个模块化的 ChannelHandler 中,这样可以降低应用程序的复杂度。比如,你可以把 TimeClientHandler 拆分成两个 handler:
TimeDecoder,负责分段问题;
最初那个简版的 TimeClientHandler.
Netty 提供了可扩展的类帮助你实现 TimeDecoder:
public class TimeDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
if (buffer.readableBytes() < 4) {
return null;
}
return buffer.readBytes(4);
}
}
FrameDecoder 是 ChannelHandler 的一种实现,专门用来处理分段问题;
FrameDecoder 在每次接收到新的数据时调用 decode 方法,携带一个内部维持的累积缓存;
如果返回 null,说明目前数据接收的还不够,当数据量足够时 FrameDecoder 会再次调用方法;
如果返回非 null 对象,代表解码成功,FrameDecoder 会丢弃累积缓存中剩余的数据。你无需提供批量解码,FrameDecoder 会继续调用 decode 方法直到返回 null。
拆分之后,我们需要修改 TimeClient 的 ChannelPipelineFactory 实现:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new TimeDecoder(),
new TimeClientHandler());
}
});
Netty 还提供了进一步简化解码的 ReplayingDecoder:
public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer, VoidEnum state) {
return buffer.readBytes(4);
}
}
此外,Netty 提供了一批开箱即用的解码器,让你可以简单得实现大多数协议:
org.jboss.netty.example.factorial 用于二进制协议;
org.jboss.netty.example.telnet 用于基于行的文本协议.
8、用 POJO 替代 ChannelBuffer
上面的 demo 我们都是用 ChannelBuffer 作为协议化消息的基本数据结构,这一节我们用 POJO 替代 ChannelBuffer。将从 ChannelBuffer 提取信息的代码跟 handler 分离开,会使 handler 变得更加可维护的和可重用的。从上面的 demo 里不容易看出这个优势,但是实际应用中分离很有必要。首先,我们定义一个类型 UnixTime:
public class UnixTime {
private final int value;
public UnixTime(int value) {
this.value = value;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return new Date(value * 1000L).toString();
}
}
现在我们可以修改 TimeDecoder 让它返回一个 UnixTime 而不是 ChannelBuffer:
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
if (buffer.readableBytes() < 4) {
return null;
}
return new UnixTime(buffer.readInt());
}
编码器改了,那么相应的 TimeClientHandler 就不会继续使用 ChannelBuffer:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
UnixTime m = (UnixTime) e.getMessage();
System.out.println(m);
e.getChannel().close();
}
同样的技术也可以应用到服务端的 TimeServerHandler 上:
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000));
ChannelFuture f = e.getChannel().write(time);
f.addListener(ChannelFutureListener.CLOSE);
}
能这样运用的前提是有一个编码器,可以把 UnixTime 对象翻译成 ChannelBuffer:
public class TimeEncoder extends SimpleChannelHandler {
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
UnixTime time = (UnixTime) e.getMessage();
ChannelBuffer buf = buffer(4);
buf.writeInt(time.getValue());
Channels.write(ctx, e.getFuture(), buf);
}
}
一个编码器重写 writeRequested 方法拦截一个写请求。这里需要注意的一点是,尽管这里的 writeRequested 方法参数里也有一个 MessageEvent 对象,客户端 TimeClientHandler 的 messageReceived 的参数里也有一个,但是它们的解读是完全不同的。一个 ChannelEvent 可以是 upstream 也可以是 downstream 事件,这取决于事件的流向。messageReceived 方法里的 MessageEvent 是一个 upstream 事件,而 writeRequested 方法里的是 downstream 事件。
当把 POJO 类转化为 ChannelBuffer 后,你需要把 ChannelBuffer 转发到之前在 ChannelPipeline 内的 ChannelDownstreamHandler,也就是 TimeServerHandler。Channels 提供了多个帮助方法创建和发送 ChanenlEvent。
同样,TimeEncoder 也需要加入到服务端的 ChannelPipeline 中:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new TimeServerHandler(),
new TimeEncoder());
}
});
9、关闭你的应用程序
为了关闭 I / O 线程让应用程序优雅得退出,我们需要释放 ChannelFactory 分配的资源。一个典型网络应用程序的关闭过程分为三步:
关闭所有服务端 socket 连接;
关闭所有非服务端 socket 连接(包括客户端 socket 和服务端接收到的 socket);
释放 ChannelFactory 使用的所有资源。
应用到 TimeClient 上:
ChannelFuture future = bootstrap.connect(…);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
}
future.getChannel().getCloseFuture().awaitUninterruptibly();
factory.releaseExternalResources();
CilentBootStrap 的 connect 方法返回一个 ChannelFuture,当连接尝试成功或者失败时会通知到 ChannelFuture。它还持有连接尝试关联的 Channel 的引用;
ChannelFuture.awaitUninterruptibly() 等待 ChannelFuture 确定连接是否尝试成功;
如果连接失败,我们打印出失败的原因。ChannelFuture.getCause() 会在连接即没有成功也没有取消的情况下返回失败的原因;
连接尝试的情况处理之后,我们还需要等待连接关闭。每个 Channel 有它自己的 closeFuture,用来通知你连接关闭然后你可以针对关闭做一些动作。即使连接尝试失败了,closeFuture 仍然会被通知,因为 Channel 会在连接失败后自动关闭;
所有连接关闭之后,剩下的就是释放 ChannelFactory 使用的资源了。释放过程很简单,调用它的 releaseExternalResources 方法,所有相关的 NIO Selector 和线程池将会自动关闭。
关闭一个客户端很简单,那服务端呢?你需要从端口解绑并关闭所有接收到的连接。前提是你需要一个保持跟踪活跃连接的数据结构,Netty 提供了 ChannelGroup。
ChannelGroup 是 Java 集合 API 一个特殊的的扩展,它代表一组打开的 Channel。如果一个 Channel 被添加到 ChannelGroup,然后这个 Channel 被关闭了,它会从 ChannelGroup 中自动移除。你可以对同一 ChannelGroup 中的 Channel 做批量操作,比如在关闭服务的时候关闭所有 Channel。
要跟踪打开的 socket,你需要修改 TimeServerHandler,把新打开的 Channel 添加到全局的 ChannelGroup 变量中。ChannelGroup 是线程安全的。
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
TimeServer.allChannels.add(e.getChannel());
}
现在我们自动维持了一个包含所有活跃 Channel 的列表,关闭服务端就像关闭客户端一样容易了。
public class TimeServer {
static final ChannelGroup allChannels = new DefaultChannelGroup(“time-server”);
public static void main(String[] args) throws Exception {
…
ChannelFactory factory = …;
…
ServerBootstrap bootstrap = …;
…
Channel channel = bootstrap.bind(new InetSocketAddress(8080));
allChannels.add(channel);
waitForShutdownCommand();
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
factory.releaseExternalResources();
}
}
DefaultChannelGroup 构造方法接收组名为参数,组名是它的唯一标识;
ServerBootstrap 的 bind 方法返回一个服务端的绑定指定本地地址的 Channel,调用 Channel 的 close 方法将会使它与本地地址解绑;
所有 Channel 类型都可以被添加到 ChannelGroup 中,不管是客户端、服务端或是服务端接收的。因为你可以在服务器关闭时同时关闭绑定的 Channel 和接收到的 Channel;
waitForShutdownCommand() 是一个等待关闭信号的虚构方法。
我们可以对 ChannelGroup 中的 Channel 进行统一操作,这里我们调用 close 方法,相当于解绑服务端 Channel 并且异步关闭所有接收到的 Channel。close 方法返回一个功能和 ChannelFuture 相近的 ChannelGroupFuture,在所有连接都成功关闭通知我们。
10、总结
这一节我们快速浏览了 Netty,示范了如何用 Netty 写一个能正常工作的网络应用。下一节将介绍 Netty 的更多细节。