引言
本局部整合聊天零碎无关的章节,内容次要是介绍要害性能的实现逻辑和局部代码实现,倡议读者先看看作者的博客我的项目,切换到不同的分支看看各个细节性能如何实现。这里仅仅记录一些集体学习过程的重点局部。
思维导图
https://www.mubu.com/doc/1dunN_7Luzl
我的项目代码
作者的仓库代码地址:https://github.com/lightningMan/flash-netty5
通信协议设计和自定义编解码实现
什么是通信协议?
基于TCP通信均为二进制协定,底层都是通过字节进行传输的。在通信协议当中规定数据传输的每一个字节含意。
通信过程
- 客户端转换数据为二进制。
- 网络传输给服务端。
- 服务端依据协定规定读取二进制数据。
- 服务端解决数据返回响应后果给客户端。
聊天零碎的通信协议数据对象设计
在聊天零碎当中通信协议的设计如下。
4字节魔数
比方Java的字节码CafeBabe
,用于疾速辨认是否自定义协定,也能够不便疾速提取数据。
public static final int MAGIC_NUMBER = 0x12345678;
1 字节版本号
相似TCP的IPV4还是IPV6。
/** * 协定版本 */ @JSONField(deserialize = false, serialize = false) private Byte version = 1;
1 字节序列化算法
应用1个字节来标识算法。
/** * 序列化算法定义 */ public interface SerializerAlgorithm { /** * json 序列化 */ byte JSON = 1; }
1 字节指令
一个字节最多示意256种指令。留神在设计上指令和版本号进行绑定关联,实现不同版本之间的指令兼容,进步程序的健壮性。
@Data public abstract class Packet { /** * 协定版本 */ @JSONField(deserialize = false, serialize = false) private Byte version = 1; @JSONField(serialize = false) public abstract Byte getCommand(); }
4字节数据长度
数据长度是必要的,次要用于字节流这种连续不断的数据模式进行切割。
byteBuf.writeInt(bytes.length);
int 根本数据类型在Java中默认占4个字节,这4个字节用来存储字节数组的长度。
N字节数据
数据局部。
如何实现JAVA对象二进制互相转化?
所谓互转对应了网络 Socket IO 的input/output
中的数据转化局部,实体数据转为字节流这个过程咱们通常叫做编码,反之则是解码。
无论是编码还是解码,都是依赖Netty自定义的 MessageToMessageCodec实现。聊天零碎的编码和解码工作都是依赖 PacketCodecHandler 实现的。
@ChannelHandler.Sharable public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> { public static final PacketCodecHandler INSTANCE = new PacketCodecHandler(); private PacketCodecHandler() { } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) { out.add(PacketCodec.INSTANCE.decode(byteBuf)); } @Override protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) { ByteBuf byteBuf = ctx.channel().alloc().ioBuffer(); PacketCodec.INSTANCE.encode(byteBuf, packet); out.add(byteBuf); } }
自定义逻辑处理器,在 Netty Server 中须要注册到 pipeline 当中。
public static void main(String[] args) { NioEventLoopGroup boosGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(boosGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { //...... ch.pipeline().addLast(PacketCodecHandler.INSTANCE); // ...... } }); bind(serverBootstrap, PORT); }
这里解释下为什么PacketCodecHandler
要被注解标记为“Sharable”,因为编码和解码可能在多个handler
中用到,为了提高效率,这里通过共享缩小实例的创立。
下文也会介绍这个单例模式的优化点。
带着疑难咱们再看看@ChannelHandler.Sharable
这个注解的源码解释。
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.
If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.This annotation is provided for documentation purpose, just like the JCIP annotations
下面的内容翻译过去就是:
被注解的Sharable
的同一个ChannelHandler实例,能够被屡次增加到一个或多个ChannelPipeline
中,并且能够确保不会呈现竞争状况。如果没有指定这个注解,那么每次就创立新的Channel都须要应用新的Handler实例。在有不共享的状态,如成员变量时候,就不能用这个注解。
简略来说@ChannelHandler.Sharable
实现了Netty中的"Bean"单例和共享。
实战局部
数据编码过程(思路)
上面是数据解码的根本编写思路。
- 增加编码器。
ch.pipeline().addLast(new PacketEncoder());
- 往
ByteBuf
一一写字段,实现编码过程。
public class PacketEncoder extends MessageToByteEncoder<Packet> { @Override protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) { PacketCodec.INSTANCE.encode(out, packet); } }
- 残缺的自定义协定:PacketCodec#encode。
public void encode(ByteBuf byteBuf, Packet packet) { // 1. 序列化 java 对象 byte[] bytes = Serializer.DEFAULT.serialize(packet); // 2. 理论编码过程 byteBuf.writeInt(MAGIC_NUMBER); byteBuf.writeByte(packet.getVersion()); byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); byteBuf.writeByte(packet.getCommand()); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); }
解码数据过程(思路)
上面是数据解码的根本编写思路:
- 在handler当中增加自定义逻辑处理器。
.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new PacketDecoder()); } });
- 定义解码逻辑处理器。
public class PacketDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { out.add(PacketCodec.INSTANCE.decode(in)); } }
上面定义具体的解码过程:
- 跳过魔数。
- 跳过协定版本号 。
- 读取序列化算法。
- 读取指令,数据包,算法标识等自定义协定的根本内容。
- 依据数据长度。
- 取出数据。
PacketCodec#decode
public Packet decode(ByteBuf byteBuf) { // 跳过 magic number byteBuf.skipBytes(4); // 跳过版本号 byteBuf.skipBytes(1); // 序列化算法 byte serializeAlgorithm = byteBuf.readByte(); // 指令 byte command = byteBuf.readByte(); // 数据包长度 int length = byteBuf.readInt(); byte[] bytes = new byte[length]; byteBuf.readBytes(bytes); Class<? extends Packet> requestType = getRequestType(command); Serializer serializer = getSerializer(serializeAlgorithm); if (requestType != null && serializer != null) { return serializer.deserialize(requestType, bytes); } return null; }
思考
JSON序列化形式之外其余序列化形式如何实现?
Java原生序列化
- 类实现 Serializable 接口
具体底层由ObjectOutputStream和ObjectInputStream实现
Hessian
- Hessian 是动静类型、二进制、紧凑的,并且可跨语言移植的一种序列化框架
Hessian 协定要比 JDK、JSON 更加紧凑,性能上要比 JDK、JSON 序列化高效很多,而且生成的字节数也更小
Protobuf
- 谷歌实现的混合语言数据规范
- 轻便、高效的结构化数据存储格局
- 反对 Java、Python、C++、Go 等语言
要求定义 IDL(Interface description language),并且应用对应语言的IDL生成序列化工具类
Thrift
- Facebook于2007年开发的跨语言的rpc服框架
- 通过Thrift的编译环境生成各种语言类型的接口文件
序列化和编码都是JAVA对象封装二进制过程,两者的分割和区别
总结起来就是一句话:序列化是指标,编码是办法。网上有一张图十分直观的展现了两者的区别。
两者的分割和区别
编码:信息从一种模式或格局转换为另一种模式的过程,目标是不便传输协定通信。
序列化:“序列化”其实自身也是“信息从一种模式或格局转换为另一种模式的过程”,只不过这个表现形式直观具体,序列化也经常用于表白一个对象的状态。
聊天零碎的Netty细节优化
优化局部是聊天零碎的精华,也是应用Netty实际十分有价值的领导和参考,所以优先把优化局部放到后面介绍。
1. 应用共享Handler
问题剖析
在旧版本代码中,每个新连贯每次通过 ChannelInitializer 调用,都会产生9个指令对象都被new一遍操作,然而能够看到其实很多处理器外部是没有任何 "状态"的,对于无状态的业务处理器就能够应用单例模式封装。
serverBootstrap .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new Spliter()); ch.pipeline().addLast(new PacketDecoder()); ch.pipeline().addLast(new LoginRequestHandler()); ch.pipeline().addLast(new AuthHandler()); ch.pipeline().addLast(new MessageRequestHandler()); ch.pipeline().addLast(new CreateGroupRequestHandler()); ch.pipeline().addLast(new JoinGroupRequestHandler()); ch.pipeline().addLast(new QuitGroupRequestHandler()); ch.pipeline().addLast(new ListGroupMembersRequestHandler()); ch.pipeline().addLast(new GroupMessageRequestHandler()); ch.pipeline().addLast(new LogoutRequestHandler()); ch.pipeline().addLast(new PacketEncoder()); } });
优化伎俩
- 通过退出注解
@ChannelHandler.Shareble
,示意这个 handler 是反对多个 channel 共享的,否则会报错。 - 公布动态final的不可变对象来实现单例,编译器优化。
- 最初还能够压缩Handler,把编码和解码过程放到一个Handler和专用的Handler放到一个Handller解决(比方申请指令散发解析解决)。
注意事项
次要的注意事项如下:
- 并不是所有的Handler都能够单例
- Spliter 不是单例的,因为它须要对每个数据做拆包解决。
2. 缩短事件流传门路
问题剖析
- 首先,指令的decode必须要在最后面后面,因为波及前面的命令解析,所以这个Handler是无奈“压缩”的。
- 然而如果把每个命令decode之后再流传到每个命令事件,然而对应的事件又不做任何解决,那么会节约很屡次多余的命令判断。
- 基本目标:缩短事件流传链条,事件流传链尽可能短。
优化伎俩
优化伎俩实际上也很简略,那就是 应用对立Handler。
通常的做法如下:
- 该Handler只做判断,不做任何状态存储,应用单例优化。
public static final IMHandler INSTANCE = new IMHandler();
- 聊天零碎中利用HashMap存储所有的命令解决Handler,这里集体顺带指定下初始化大小优化一下。
private IMHandler() { handlerMap = new HashMap<>(7); handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE); handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE); handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE); handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE); handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE); handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE); handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE); }
- 回调
channelRead0
实际上就是委托给map中的元素对应的指令处理器解决。
@Override protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception { handlerMap.get(packet.getCommand()).channelRead(ctx, packet); }
通过一个对立的处理器包含多个动态单例处理器,无效缩小JVM内存开销,单例也能够缩小对象实例化的开销。
3. 事件流传源调整
关键点
如果你的 outBound 类型的 handler 较多,在写数据的时候能用 ctx.writeAndFlush()
就用这个办法, 不要用 ctx.channel().writeAndFlush()
。
起因
究其原因是ctx.writeAndFlush() 会绕过所有不须要解决的其余Outbound类型。ctx.writeAndFlush()
是从 pipeline 链中的以后节点开始往前找到第一个 outBound 类型向前流传的,如果这个对象不须要其余outBound的handler解决就能够用这个办法。
而ctx.channel().writeAndFlush() 体现则不同,它是从pipeline 链中的最初一个 outBound 类型的 handler 开始,把对象往前进行流传,从图中就能够看到, outBound 的处理器越多,就会产生越多“无用”操作。
当然如果确定前面的 outBound 须要如此解决,那么就能够用这个办法。
相干问题
- writeAndFlush为什么能够缩短事件流传门路?
- 它是如何实现OutBound类型的事件流传缩短的?
4. 缩小阻塞主线程的操作【重要】
Netty中容易被忽视,却是十分重要的一个概念。那就是 一个Channel的其中一个Handler阻塞,会导致所有其余绑定的Channel一起被拖慢。
比方只有有一个 channel
的一个 handler
中的 channelRead0()
办法阻塞了 NIO 线程,最终都会拖慢绑定在该 NIO 线程上的其余所有的 channel
List<Channel> channelList = 已有数据可读的 channelfor (Channel channel in channelist) { for (ChannelHandler handler in channel.pipeline()) { handler.channelRead0(); } }
下面的操作如果for循环某次呈现卡顿,这不仅仅拖慢一个客户端,而是拖慢所有客户端。
所以Netty进行客户端解决的时候个别设计为非阻塞模式,或者会应用 业务线程池 去预防这种状况。业务线程池的实现形式更为常见,也就是Netty中一套线程池,理论处理过程中再委派给自定义的业务线程池单开线程解决。这样就实现了非阻塞异步执行工作的目标。
须要留神引入业务线程池会减少零碎复杂度,也会减少线上调试难度,所以做好链路追踪非常重要。
5. 如何精确统计时长?
错误做法:在一个线程的头尾退出时间差计算得出执行时长后果。
正确做法:应用writeAndFlush+addListener 的形式判断 futrue.isDone
之后才计算
起因:writeAndFlush 在非NIO线程中它是一个异步操作,其余操作由第一个工作队列异步执行。
关键点:writeAndFlush 真正执行实现才算是实现解决,监听它实现解决的回调动作能力算出精确执行时长。
优化小结
- 如果Handler多例然而无状态,齐全能够改为单例模式 。
- 尽可能减少Handler的臃肿,避免调用链路过长。
- Handler的耗时操作要交给线程池开启新线程解决,一个耗时操作不只影响单个Channel。倡议业务线程池独自开新线程形式优化,然而须要留神和线程绑定的相干参数解决问题 。
- 耗时统计,writeAndFlush属于异步工作。
实现登录
解决流程图
实现思路
- 指标客户端和服务端别离启动Netty服务。
- 客户端发送登录申请指令,服务端解码之后依据传输后果校验,依据校验后果构建登录申请响应指令LoginResponsePacket。
通过ctx.writeAndFlush(loginResponsePacket); 回送响应后果给客户端。
- 登录校验胜利,通过SessionUtil增加session信息
- 客户端登录胜利之后,构建申请指令对象,设置参数,通过Netty发送到服务端 。
- 服务端收到申请进行验证,并且构建绝对应的响应指令后果对象。
实现步骤
上面是大抵的实现步骤:
- 增加 LoginRequestHandler 登录逻辑处理器在Server端。
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
@ChannelHandler.Sharable public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> { public static final LoginRequestHandler INSTANCE = new LoginRequestHandler(); protected LoginRequestHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) { LoginResponsePacket loginResponsePacket = new LoginResponsePacket(); loginResponsePacket.setVersion(loginRequestPacket.getVersion()); loginResponsePacket.setUserName(loginRequestPacket.getUserName()); if (valid(loginRequestPacket)) { loginResponsePacket.setSuccess(true); String userId = IDUtil.randomId(); loginResponsePacket.setUserId(userId); System.out.println("[" + loginRequestPacket.getUserName() + "]登录胜利"); SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel()); } else { loginResponsePacket.setReason("账号密码校验失败"); loginResponsePacket.setSuccess(false); System.out.println(new Date() + ": 登录失败!"); } // 登录响应 ctx.writeAndFlush(loginResponsePacket); } private boolean valid(LoginRequestPacket loginRequestPacket) { return true; } @Override public void channelInactive(ChannelHandlerContext ctx) { SessionUtil.unBindSession(ctx.channel()); } }
- 在客户端同样增加Handler,
LoginResponseHandler
,LoginResponseHandler的解决逻辑如下。
ch.pipeline().addLast(LoginResponseHandler.INSTANCE);
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) { String userId = loginResponsePacket.getUserId(); String userName = loginResponsePacket.getUserName(); if (loginResponsePacket.isSuccess()) { System.out.println("[" + userName + "]登录胜利,userId 为: " + loginResponsePacket.getUserId()); SessionUtil.bindSession(new Session(userId, userName), ctx.channel()); } else { System.out.println("[" + userName + "]登录失败,起因:" + loginResponsePacket.getReason()); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("客户端连贯被敞开!"); } }
客户端登录胜利或者失败,如何把失败或者胜利标识绑定在客户端连贯? 服务端如何高效断定客户端从新登录?
在聊天零碎中实现比较简单粗犷。服务端高效判断的办法是在ConcurrentHashMap
,Map当中存储用户的ID,如果登录胜利则存储到此Map中,服务端也只须要判断Map元素即可高效判断是否登录。
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
热插拔客户端是否登录验证
首先校验是否登录局部封装到工具类当中,实现比较简单。
SessionUtil
public static boolean hasLogin(Channel channel) { return getSession(channel) != null; } public static Session getSession(Channel channel) { return channel.attr(Attributes.SESSION).get(); }// AttributeKey<Session> SESSION = AttributeKey.newInstance("session");
AuthHandler
实现热插拔的思路是判断是否登录,对立通过该调用链条实现,AuthHandler自身作为独自处理器封装判断登录校验逻辑。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (!SessionUtil.hasLogin(ctx.channel())) { ctx.channel().close(); } else { ctx.pipeline().remove(this); super.channelRead(ctx, msg); }}
实现双端收发音讯
客户端解决
客户端胜利登录之后,下一步是实现客户端和服务端相互发送数据。客户端收音讯处理器如下:
// 收音讯处理器 ch.pipeline().addLast(new MessageResponseHandler());
MessageResponseHandler
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) { String fromUserId = messageResponsePacket.getFromUserId(); String fromUserName = messageResponsePacket.getFromUserName(); System.out.println(fromUserId + ":" + fromUserName + " -> " + messageResponsePacket .getMessage()); } }
服务端解决
因为是通用组件,服务端这里封装到 IMHandler 通用组件当中。
handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
MessageRequestHandler
@ChannelHandler.Sharable public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> { public static final MessageRequestHandler INSTANCE = new MessageRequestHandler(); private MessageRequestHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) { long begin = System.currentTimeMillis(); // 1.拿到音讯发送方的会话信息 Session session = SessionUtil.getSession(ctx.channel()); // 2.通过音讯发送方的会话信息结构要发送的音讯 MessageResponsePacket messageResponsePacket = new MessageResponsePacket(); messageResponsePacket.setFromUserId(session.getUserId()); messageResponsePacket.setFromUserName(session.getUserName()); messageResponsePacket.setMessage(messageRequestPacket.getMessage()); // 3.拿到音讯接管方的 channel Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId()); // 4.将音讯发送给音讯接管方 if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) { toUserChannel.writeAndFlush(messageResponsePacket).addListener(future -> { if (future.isDone()) { } }); } else { System.err.println("[" + session.getUserId() + "] 不在线,发送失败!"); } } }
小结
实现双端收发音讯小结内容如下:
- 定义收发音讯Java对象,对于音讯进行收发。
- 学习
Channel
的attr
的理论用法,能够给Channel绑定属性并且设置某些状态,外部理论也是通过Map保护的,所以不须要用户内部本人在自定义去保护。 - 如何在控制台当中获取音讯并且发送到服务端。
- 服务端回传音讯给客户端。
ChannelPipleline 和 ChannelHandler 概念
本局部是补充局部。次要介绍 Pipeline 和ChannelHanlder形成和一些根底概念。了解这一点之前须要先了解Channel这个概念。
ChannelPipleline 和 ChannelHandler 形成图
Channel 概念了解
一个客户端连贯对应一个Channel,这个Channel能够类比BIO当中的传统概念Socket套接字。
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
一个网络套接字的节点或一个可能进行(网络)I/O操作的组件,如读、写、连贯和绑定。
ChannelPipeline
源码对于 ChannelPipeline 的定义如下:
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel. ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in a pipeline interact with each other.
源码中还有一个直观的设计图。
下图形容了I/O事件在ChannelPipeline中是如何被ChannelHandlers解决的。一个I/O事件由 ChannelInboundHandler 或 ChannelOutboundHandler解决,并通过调用ChannelHandlerContext中定义的事件流传办法,如ChannelHandlerContext.fireChannelRead(Object)和ChannelHandlerContext.write(Object),转发给其最靠近的处理程序。
ChannelPipeline
的外围如下:
- 解决或拦挡一个Channel的入站事件和出站操作的链表。
- 通过责任链模式的设计,能够齐全自定义解决逻辑和
ChannelHandler
之间相互通信的逻辑。
ChannelContext
ChannelHandler与Channel和ChannelPipeline之间的映射关系,由ChannelHandlerContext
进⾏保护,依据其名称Context也能够看到存储更为丰盛的信息。
Enables a ChannelHandler to interact with its ChannelPipeline and other handlers.
使得ChannelHandler可能与它的ChannelPipeline和其余处理程序互动。
ChannelContext
能够获取整个Channel
的信息。- 获取所有的上下文。
- 逻辑处理器ChannelHandler定义解决逻辑。
ChannelHanlder
ChannelHanlder
蕴含两种了解。
第一种:能够了解为socket连贯,客户端和服务端连贯的时候会创立一个channel。 负责根本的IO操作,例如:bind()
、connect()
、read()
、write()
。
第二种:Netty的Channel接口所提供的API,大大减少了Socket类复杂性。
因为Channel连贯过程中存在双端 input/output
,所以 ChannelHandler
也分类为 ChannelInboundHandler
和 ChannelOutboundHandler
。
ChannelInboundHandler
- 读取的逻辑形象 。
channelRead
是最重要的办法 。- 配合
ByteBuf
应用进行buf.read
推动读指针挪动 。
ChannelOutboundHandler
- 对应写出的逻辑形象 。
- 外围办法是
write
,writeAndFlush
。
适配器
在应用过程中还存在对应的适配器。
ChannelOutboundHandlerAdapter
(留神解决程序和增加addLast的程序相同)ChannelInboundHandlerAdapter
客户端和服务端的 SimpleChannelInboundHandler/ChannelInboundHandlerAdapter 简化
整个聊天零碎大部分的指令判断逻辑是反复的,上面介绍如何通过 SImpleChannelInboundHandler/ChannelInboundHandlerAdapter 简化指令的解决逻辑。
ChannelInboundHandlerAdapter
which allows to explicit only handle a specific type of messages. For example here is an implementation which only handleString
messages.
ChannelInboundHandlerAdapter 容许明确地只解决特定类型的音讯。而SimpleChannelInboundHandler
提供了一个模板,作用是把解决逻辑不变的内容写好在 channelRead(ctx,msg)
中,并且在外面调用 channelRead0
,这样解决之后就能够通过形象办法实现传递到子类中去了。
区别
SimpleChannelInboundHandler
和ChannelInboundHandlerAdapter
这两个类应用上不太好辨别,上面再补充介绍一下如何正确对待应用两者。
ChannelInboundHandlerAdapter
须要笼罩的办法是channelRead,特点是不会主动开释音讯,须要调用ctx.fireChannelRead(msg) 向后续链条处理器传递音讯,也就是须要手动通过责任链的形式传递给下位处理器。SimpleChannelInboundHandler
是 ChannelInboundHandlerAdapter
的子类,做了额定的解决,会主动开释音讯,如果还须要持续传递音讯,需调用一次 ReferenceCountUtil.retain(msg)。需注意SimpleChannelInboundHandler
也须要调用ctx.fireChannelRead(msg)
来触发链条中下一处理器解决。
ChannelInboundHandlerAdapter
通常用于处于链条两头的某些环节解决,对数据进行某些解决,如数据验证,须要将音讯持续传递。SimpleChannelInboundHandler
则比拟适宜链条最初一个环节,该环节解决完后,后续不再须要该音讯,因而能够主动开释。
利用
在聊天零碎中对立解决的Handler继承了SimpleChannelInboundHandler,重写channelRead0
办法,次要对于解码之后的操作指令和通用Map进行匹配,如果匹配则散发到具体的逻辑处理器。
@ChannelHandler.Sharable public class IMHandler extends SimpleChannelInboundHandler<Packet> { public static final IMHandler INSTANCE = new IMHandler(); private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> handlerMap; private IMHandler() { handlerMap = new HashMap<>(7); handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE); handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE); handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE); handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE); handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE); handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE); handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE); } @Override protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception { handlerMap.get(packet.getCommand()).channelRead(ctx, packet); } }
客户端和服务端单聊
指标
- 输出用户名,服务端随机调配ID,这里省去通过账号和明码注册过程 。
- 多个客户端登录,用 userId 空格 音讯的形式单聊。
实现过程
应用工具类把UserId和Channel绑定为Session。
- Session的信息蕴含用户ID以及名称 ,后续能够扩大更多的字段。
应用
SessionUtil
工具类操作Session,通过Session储存以后会话信息。- 这里用的ConcurrentHashMap实现并发平安
- ConcurrentHashMap为userId -> Channel的映射Map。
- 用户登录的时候,须要把Session塞入Map。
- 当用户断开
Channel
连贯退出的时候,须要移除Session信息
服务端承受音讯并且转发(这里Netty相似转发手机信号的基站)
- 获取会话信息。
- 结构发给客户端的对象
MessageResponse
。 - 音讯接管方标识获取对应
Channel
。 - 如果指标用户登录则发送音讯,如果对方不在线,则控制台打印正告信息。
具体的代码在后面的收发音讯中有提到过,这里反复展现一遍。
MessageResponseHandler
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) { String fromUserId = messageResponsePacket.getFromUserId(); String fromUserName = messageResponsePacket.getFromUserName(); System.out.println(fromUserId + ":" + fromUserName + " -> " + messageResponsePacket .getMessage()); } }
MessageRequestHandler
@ChannelHandler.Sharable public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> { public static final MessageRequestHandler INSTANCE = new MessageRequestHandler(); private MessageRequestHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) { long begin = System.currentTimeMillis(); // 1.拿到音讯发送方的会话信息 Session session = SessionUtil.getSession(ctx.channel()); // 2.通过音讯发送方的会话信息结构要发送的音讯 MessageResponsePacket messageResponsePacket = new MessageResponsePacket(); messageResponsePacket.setFromUserId(session.getUserId()); messageResponsePacket.setFromUserName(session.getUserName()); messageResponsePacket.setMessage(messageRequestPacket.getMessage()); // 3.拿到音讯接管方的 channel Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId()); // 4.将音讯发送给音讯接管方 if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) { toUserChannel.writeAndFlush(messageResponsePacket).addListener(future -> { if (future.isDone()) { } }); } else { System.err.println("[" + session.getUserId() + "] 不在线,发送失败!"); } } }
群聊发动和告诉
上面两个大节围绕群聊实现介绍。整个群聊和单聊实现相似,都是通过标识获取Channel,为了方面多个成员治理,设计 ChannelGroup
实现Channel
的批量操作。
预期成果
- 三位用户顺次登录。
- 控制台输出 createGroup 指令,提醒创立群聊须要 userId 列表,之后以英文逗号分隔userId。
- 群聊创立胜利之后,所有群聊成员收到退出胜利音讯。
创立群聊实现
次要逻辑如下:
- 创立一个 channel 分组。
- 筛选出待退出群聊的用户的 channel 和 userName。
- 创立群聊创立后果的响应。
- 给每个客户端发送拉群告诉
- 保留群组相干的信息。
其中存储群的相干信息利用了ConcurrentHashMap
实现,和Session的会话信息存储形式相似,ChannelGroup对象负责封装多个Channel的信息,模仿群聊中的“群”。
@ChannelHandler.Sharable public class CreateGroupRequestHandler extends SimpleChannelInboundHandler<CreateGroupRequestPacket> { public static final CreateGroupRequestHandler INSTANCE = new CreateGroupRequestHandler(); private CreateGroupRequestHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, CreateGroupRequestPacket createGroupRequestPacket) { List<String> userIdList = createGroupRequestPacket.getUserIdList(); List<String> userNameList = new ArrayList<>(); // 1. 创立一个 channel 分组 ChannelGroup channelGroup = new DefaultChannelGroup(ctx.executor()); // 2. 筛选出待退出群聊的用户的 channel 和 userName for (String userId : userIdList) { Channel channel = SessionUtil.getChannel(userId); if (channel != null) { channelGroup.add(channel); userNameList.add(SessionUtil.getSession(channel).getUserName()); } } // 3. 创立群聊创立后果的响应 String groupId = IDUtil.randomId(); CreateGroupResponsePacket createGroupResponsePacket = new CreateGroupResponsePacket(); createGroupResponsePacket.setSuccess(true); createGroupResponsePacket.setGroupId(groupId); createGroupResponsePacket.setUserNameList(userNameList); // 4. 给每个客户端发送拉群告诉 channelGroup.writeAndFlush(createGroupResponsePacket); System.out.print("群创立胜利,id 为 " + createGroupResponsePacket.getGroupId() + ", "); System.out.println("群外面有:" + createGroupResponsePacket.getUserNameList()); // 5. 保留群组相干的信息 SessionUtil.bindChannelGroup(groupId, channelGroup); } }
客户端解决局部则是简略的打印创立群聊胜利的信息,实现比较简单这里不再贴出相干代码。
群聊成员治理实现
设计流程和实现思路
设计流程
- 退出群聊,控制台输入创立胜利音讯。
- 控制台输出joinGroup 之后输出群ID,退出群聊,控制台显示退出群胜利。
- 控制台输出 listGroupMembers 而后输出群ID,展现群成员。
- quitGroup 输出群ID,进行退群
- 控制台输出joinGroup 之后输出群ID显示对应成员不在,则退群胜利。
实现思路
- 在控制台中退出群退出的命令处理器。
- 服务端解决群聊申请。
- 客户端解决加群响应.
- 群聊退出实现。
在控制台中退出群退出的命令处理器
JoinGroupConsoleCommand
public class JoinGroupConsoleCommand implements ConsoleCommand { @Override public void exec(Scanner scanner, Channel channel) { JoinGroupRequestPacket joinGroupRequestPacket = new JoinGroupRequestPacket(); System.out.print("输出 groupId,退出群聊:"); String groupId = scanner.next(); joinGroupRequestPacket.setGroupId(groupId); channel.writeAndFlush(joinGroupRequestPacket); } }
服务端解决群聊申请
服务端解决群聊申请:
- 构建Channel分区,把处在同一个分组的Channel放到一个List当中存储
- 如果群聊构建胜利,则构建创立胜利响应后果 。
@ChannelHandler.Sharable public class JoinGroupRequestHandler extends SimpleChannelInboundHandler<JoinGroupRequestPacket> { public static final JoinGroupRequestHandler INSTANCE = new JoinGroupRequestHandler(); private JoinGroupRequestHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, JoinGroupRequestPacket requestPacket) { // 1. 获取群对应的 channelGroup,而后将以后用户的 channel 增加进去 String groupId = requestPacket.getGroupId(); ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId); channelGroup.add(ctx.channel()); // 2. 结构加群响应发送给客户端 JoinGroupResponsePacket responsePacket = new JoinGroupResponsePacket(); responsePacket.setSuccess(true); responsePacket.setGroupId(groupId); ctx.writeAndFlush(responsePacket); } }
客户端解决加群响应
简略打印加群的响应音讯。
public class JoinGroupResponseHandler extends SimpleChannelInboundHandler<JoinGroupResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, JoinGroupResponsePacket responsePacket) { if (responsePacket.isSuccess()) { System.out.println("退出群[" + responsePacket.getGroupId() + "]胜利!"); } else { System.err.println("退出群[" + responsePacket.getGroupId() + "]失败,起因为:" + responsePacket.getReason()); } } }
群聊退出实现
群聊退出次要是获取群对应的 channelGroup,而后将以后用户的 channel 移除,之后构建退群的响应信息回传客户端即可。
QuitGroupRequestHandler
@ChannelHandler.Sharable public class QuitGroupRequestHandler extends SimpleChannelInboundHandler<QuitGroupRequestPacket> { public static final QuitGroupRequestHandler INSTANCE = new QuitGroupRequestHandler(); private QuitGroupRequestHandler() { } @Override protected void channelRead0(ChannelHandlerContext ctx, QuitGroupRequestPacket requestPacket) { // 1. 获取群对应的 channelGroup,而后将以后用户的 channel 移除 String groupId = requestPacket.getGroupId(); ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId); channelGroup.remove(ctx.channel()); // 2. 结构退群响应发送给客户端 QuitGroupResponsePacket responsePacket = new QuitGroupResponsePacket(); responsePacket.setGroupId(requestPacket.getGroupId()); responsePacket.setSuccess(true); ctx.writeAndFlush(responsePacket); } }
心跳检测
网络问题
假死
TCP层面来看,服务端收到4次握手包或者RST包才算真正断开连接,如果中途应用程序并没有捕捉到,此时是认为这条连贯存在的。
假死引发问题
- 客户端发送数据超时无响应,影响体验。
节约CPU和内存资源,性能下滑。
假死起因
- 公网丢包,网络抖动 。
- 应用程序阻塞无奈读写 。
- 客户端或者服务端设别故障,网卡,机房故障。
为了解决下面的问题,通常会应用心跳检测机制定期检测每个Channel
连贯是否存活。
服务端心跳检测实现
- 通过
IdleStateHandler
自带Handler
实现 - 继承类,而后开启定时工作
- 触发假死该
Handler
回调channelIdle
办法
客户端预判和进攻假死
- 新建
Handler
。 - 开启定时线程。
- 组装心跳包。
- 发送心跳。
- 服务端简略开发承受和辨认心跳包的Handler,之后回送收到心跳包音讯即可。
注意事项
- 心跳检测Handler插入到整个Pipeline最后面,因为如果连贯理论曾经断开后续的所有解决均无意义。
- 假死不肯定“死”,避免服务端误判,客户端也须要措施避免假死和预判假死,这就是客户端预判的含意。
思考
- IdleHandler 可否单例?
- 断开链接之后从新连贯登录
IdleHandler 可否单例?
答案是不能。因为它并不是无状态的,并且每个Channel都有各自的连贯状态。
断开链接之后从新连贯登录
通过额定的线程定时轮循所有的连贯的活跃性,如果发现其中有死连贯,则执行重连。
写在最初
相熟聊天零碎对于后续的源码剖析非常有意义,我的项目的整体构建比较简单,集体在笔记中将重点局部做了一个梳理。
文章参考
https://juejin.cn/book/m/6844733738119593991/section/6844733738291576840?suid=2040300414187416