简介
之前的系列文章中咱们学到了 netty 的根本构造和工作原理,各位小伙伴肯定按捺不住心中的喜悦,想要开始手写代码来体验这神奇的 netty 框架了,刚好最近东京奥运会,咱们写一个 netty 的客户端和服务器为中国加油可好?
场景布局
那么咱们明天要搭建什么样的零碎呢?
首先要搭建一个 server 服务器,用来解决所有的 netty 客户的连贯,并对客户端发送到服务器的音讯进行解决。
还要搭建一个客户端,这个客户端负责和 server 服务器建设连贯,并发送音讯给 server 服务器。在明天的例子中,客户端在建设连贯过后,会首先发送一个“中国”音讯给服务器,而后服务器收到音讯之后再返回一个”加油!“音讯给客户端,而后客户端收到音讯之后再发送一个“中国”音讯给服务器 …. 以此往后,循环反复直到奥运完结!
咱们晓得客户端和服务器端进行音讯解决都是通过 handler 来进行的,在 handler 外面,咱们能够重写 channelRead 办法,这样在读取 channel 中的音讯之后,就能够对音讯进行解决了,而后将客户端和服务器端的 handler 配置在 Bootstrap 中启动就能够了,是不是很简略?一起来做一下吧。
启动 Server
假如 server 端的 handler 叫做 CheerUpServerHandler,咱们应用 ServerBootstrap 构建两个 EventLoopGroup 来启动 server,有看过本系列最后面文章的小伙伴可能晓得,对于 server 端须要启动两个 EventLoopGroup,一个 bossGroup,一个 workerGroup,这两个 group 是父子关系,bossGroup 负责解决连贯的相干问题,而 workerGroup 负责解决 channel 中的具体音讯。
启动服务的代码千篇一律,如下所示:
// Server 配置
//boss loop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//worker loop
EventLoopGroup workerGroup = new NioEventLoopGroup();
final CheerUpServerHandler serverHandler = new CheerUpServerHandler();
try {ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// tcp/ip 协定 listen 函数中的 backlog 参数, 期待连接池的大小
.option(ChannelOption.SO_BACKLOG, 100)
// 日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
// 初始化 channel,增加 handler
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
// 日志处理器
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 启动服务器
ChannelFuture f = b.bind(PORT).sync();
// 期待 channel 敞开
f.channel().closeFuture().sync();
不同的服务,启动服务器的代码根本都是一样的,这里咱们须要留神这几点。
在 ServerBootstrap 中,咱们退出了一个选项:ChannelOption.SO_BACKLOG,ChannelOption.SO_BACKLOG 对应的是 tcp/ip 协定 listen(int socketfd,int backlog) 函数中的 backlog 参数,用来初始化服务端可连贯队列,backlog 参数指定了这个队列的大小。因为对于一个连贯来说,解决客户端连贯申请是程序解决的,所以同一时间只能解决一个客户端连贯,多个客户端来的时候,服务端将不能解决的客户端连贯申请放在队列中期待解决,
另外咱们还增加了两个 LoggingHandler,一个是给 handler 增加的,一个是给 childHandler 增加的。LoggingHandler 次要监控 channel 中的各种事件,而后输入对应的音讯,十分好用。
比方在服务器启动的时候会输入上面的日志:
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0xd9b41ea4] REGISTERED
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0xd9b41ea4] BIND: 0.0.0.0/0.0.0.0:8007
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0xd9b41ea4, L:/0:0:0:0:0:0:0:0:8007] ACTIVE
这个日志是第一个 LoggingHandler 输入的,别离代表了服务器端的 REGISTERED、BIND 和 ACTIVE 事件。从输入咱们能够看到,服务器自身绑定的是 0.0.0.0:8007。
在客户端启动和服务器端建设连贯的时候会输入上面的日志:
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x37a4ba9f, L:/0:0:0:0:0:0:0:0:8007] READ: [id: 0x6dcbae9c, L:/127.0.0.1:8007 - R:/127.0.0.1:54566]
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x37a4ba9f, L:/0:0:0:0:0:0:0:0:8007] READ COMPLETE
下面日志示意 READ 和 READ COMPLETE 两个事件,其中 L:/127.0.0.1:8007 – R:/127.0.0.1:54566 代表本地服务器的 8007 端口连贯了客户端的 54566 端口。
对于第二个 LoggingHandler 来说,会输入一些具体的音讯解决相干的音讯。比方 REGISTERED、ACTIVE、READ、WRITE、FLUSH、READ COMPLETE 等事件,这外面就不一一列举了。
启动客户端
同样的,假如客户端的 handler 名称叫做 ChinaClientHandler,那么能够相似启动 server 一样启动客户端,如下:
// 客户端的 eventLoop
EventLoopGroup group = new NioEventLoopGroup();
try {Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
// 增加日志处理器
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChinaClientHandler());
}
});
// 启动客户端
ChannelFuture f = b.connect(HOST, PORT).sync();
客户端启动应用的是 Bootstrap,咱们同样为他配置了一个 LoggingHandler,并增加了自定义的 ChinaClientHandler。
音讯解决
咱们晓得有两种 handler,一种是 inboundHandler, 一种是 outboundHandler, 这里咱们是要监控从 socket 读取数据的事件,所以这里客户端和服务器端的 handler 都继承自 ChannelInboundHandlerAdapter 即可。
音讯解决的流程是客户端和服务器建设连贯之后,会首先发送一个”中国“的音讯给服务器。
客户端和服务器建设连贯之后,会触发 channelActive 事件,所以在客户端的 handler 中就能够发送音讯了:
public void channelActive(ChannelHandlerContext ctx) {ctx.writeAndFlush("中国");
}
服务器端在从 channel 中读取音讯的时候会触发 channelRead 事件,所以服务器端的 handler 能够重写 channelRead 办法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {log.info("收到音讯:{}",msg);
ctx.writeAndFlush("加油!");
}
而后客户端从 channel 中读取到 ” 加油!” 之后,再将”中国“写到 channel 中,所以客户端也须要重写办法 channelRead:
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush("中国");
}
这样是不是就能够周而复始的进行上来了呢?
音讯解决中的陷阱
事实上,当你执行下面代码你会发现,客户端的确将”中国“音讯写入了 channel,然而服务器端的 channelRead 并没有被触发。为什么呢?
钻研发下,如果写入的对象是一个 String,程序外部会有这样的谬误,然而这个谬误是暗藏的,你并不会在运行的程序输入中看到,所以对老手小伙伴还是很不敌对的。这个谬误就是:
DefaultChannelPromise@57f5c075(failure: java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion))
从谬误的信息能够看出,目前反对的音讯类型有两种,别离是 ByteBuf 和 FileRegion。
好了,咱们将下面的音讯类型改成 ByteBuf 试一试:
message = Unpooled.buffer(ChinaClient.SIZE);
message.writeBytes("中国".getBytes(StandardCharsets.UTF_8));
public void channelActive(ChannelHandlerContext ctx) {log.info("可读字节:{},index:{}",message.readableBytes(),message.readerIndex());
log.info("可写字节:{},index:{}",message.writableBytes(),message.writerIndex());
ctx.writeAndFlush(message);
}
下面咱们定义了一个 ByteBuf 的全局 message 对象,并将其发送给 server,而后在 server 端读取到音讯之后,再发送一个 ByteBuf 的全局 message 对象给 client,如此周而复始。
然而当你运行下面的程序之后会发现,服务器端的确收到了”中国“,客户端也的确收到了”加油!“,然而客户端后续发送的”中国“音讯服务器端却收不到了,怎么回事呢?
咱们晓得 ByteBuf 有 readableBytes、readerIndex、writableBytes、writerIndex、capacity 和 refCnt 等属性,咱们将这些属性在 message 发送前和发送之后进行比照:
在音讯发送之前:
可读字节:6,readerIndex:0
可写字节:14,writerIndex:6
capacity:20,refCnt:1
在音讯发送之后:
可读字节:6,readerIndex:0
可写字节:-6,writerIndex:6
capacity:0,refCnt:0
于是问题找到了,因为 ByteBuf 在解决过一次之后,refCnt 变成了 0,所以无奈持续再次反复写入,怎么解决呢?
简略的方法就是每次发送的时候再从新 new 一个 ByteBuf,这样就没有问题了。
然而每次都新建一个对象如同有点节约空间,怎么办呢?既然 refCnt 变成了 0,那么咱们调用 ByteBuf 中的 retain() 办法减少 refCnt 不就行了?
答案就是这样,然而要留神,须要在发送之前调用 retain() 办法,如果是在音讯被解决过后调用 retain() 会报异样。
总结
好了,运行下面的程序就能够始终给中国加油了,YYDS!
本文的例子能够参考:learn-netty4
本文已收录于 http://www.flydean.com/06-netty-cheerup-china/
最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!
欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!