共计 24442 个字符,预计需要花费 62 分钟才能阅读完成。
引言
本局部整合聊天零碎无关的章节,内容次要是介绍要害性能的实现逻辑和局部代码实现,倡议读者先看看作者的博客我的项目,切换到不同的分支看看各个细节性能如何实现。这里仅仅记录一些集体学习过程的重点局部。
思维导图
https://www.mubu.com/doc/1dunN_7Luzl
我的项目代码
作者的仓库代码地址:https://github.com/lightningMan/flash-netty5
通信协议设计和自定义编解码实现
什么是通信协议?
基于 TCP 通信均为二进制协定,底层都是通过字节进行传输的。在通信协议当中规定数据传输的每一个字节含意。
通信过程
- 客户端转换数据为二进制。
- 网络传输给服务端。
- 服务端依据协定规定读取二进制数据。
- 服务端解决数据返回响应后果给客户端。
聊天零碎的通信协议数据对象设计
在聊天零碎当中通信协议的设计如下。
4 字节魔数
比方 Java 的字节码CafeBabe
,用于疾速辨认是否自定义协定,也能够不便疾速提取数据。
public static final int MAGIC_NUMBER = 0x12345678;
1 字节版本号
相似 TCP 的 IPV4 还是 IPV6。
/**
* 协定版本
*/
@JSONField(deserialize = false, serialize = false)
private Byte version = 1;
1 字节序列化算法
应用 1 个字节来标识算法。
/**
* 序列化算法定义
*/
public interface SerializerAlgorithm {
/**
* json 序列化
*/
byte JSON = 1;
}
1 字节指令
一个字节最多示意 256 种指令。留神在设计上指令和版本号进行绑定关联,实现不同版本之间的指令兼容,进步程序的健壮性。
@Data
public abstract class Packet {
/**
* 协定版本
*/
@JSONField(deserialize = false, serialize = false)
private Byte version = 1;
@JSONField(serialize = false)
public abstract Byte getCommand();}
4 字节数据长度
数据长度是必要的,次要用于字节流这种连续不断的数据模式进行切割。
byteBuf.writeInt(bytes.length);
int 根本数据类型在 Java 中默认占 4 个字节,这 4 个字节用来存储字节数组的长度。
N 字节数据
数据局部。
如何实现 JAVA 对象二进制互相转化?
所谓互转对应了网络 Socket IO 的 input/output
中的数据转化局部,实体数据转为字节流这个过程咱们通常叫做 编码,反之则是解码。
无论是编码还是解码,都是依赖 Netty 自定义的 MessageToMessageCodec实现。聊天零碎的编码和解码工作都是依赖 PacketCodecHandler 实现的。
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {public static final PacketCodecHandler INSTANCE = new PacketCodecHandler();
private PacketCodecHandler() {}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) {out.add(PacketCodec.INSTANCE.decode(byteBuf));
}
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
PacketCodec.INSTANCE.encode(byteBuf, packet);
out.add(byteBuf);
}
}
自定义逻辑处理器,在 Netty Server 中须要注册到 pipeline 当中。
public static void main(String[] args) {NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {
//......
ch.pipeline().addLast(PacketCodecHandler.INSTANCE);
// ......
}
});
bind(serverBootstrap, PORT);
}
这里解释下为什么 PacketCodecHandler
要被注解标记为“Sharable”,因为编码和解码可能在多个 handler
中用到,为了提高效率,这里通过共享缩小实例的创立。
下文也会介绍这个单例模式的优化点。
带着疑难咱们再看看 @ChannelHandler.Sharable
这个注解的源码解释。
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.
If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.
This annotation is provided for documentation purpose, just like the JCIP annotations
下面的内容翻译过去就是:
被注解的 Sharable
的同一个 ChannelHandler 实例,能够被屡次增加到一个或多个 ChannelPipeline
中,并且能够确保不会呈现竞争状况。如果没有指定这个注解,那么每次就创立新的 Channel 都须要应用新的 Handler 实例。在有不共享的状态,如成员变量时候,就不能用这个注解。
简略来说 @ChannelHandler.Sharable
实现了 Netty 中的 ”Bean” 单例和共享。
实战局部
数据编码过程(思路)
上面是数据解码的根本编写思路。
- 增加编码器。
ch.pipeline().addLast(new PacketEncoder());
- 往
ByteBuf
一一写字段,实现编码过程。
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {PacketCodec.INSTANCE.encode(out, packet);
}
}
- 残缺的自定义协定:PacketCodec#encode。
public void encode(ByteBuf byteBuf, Packet packet) {
// 1. 序列化 java 对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
// 2. 理论编码过程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
解码数据过程(思路)
上面是数据解码的根本编写思路:
- 在 handler 当中增加自定义逻辑处理器。
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new PacketDecoder());
}
});
- 定义解码逻辑处理器。
public class PacketDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {out.add(PacketCodec.INSTANCE.decode(in));
}
}
上面定义具体的解码过程:
- 跳过魔数。
- 跳过协定版本号。
- 读取序列化算法。
- 读取指令,数据包,算法标识等自定义协定的根本内容。
- 依据数据长度。
- 取出数据。
PacketCodec#decode
public Packet decode(ByteBuf byteBuf) {// 跳过 magic number byteBuf.skipBytes(4);
// 跳过版本号
byteBuf.skipBytes(1);
// 序列化算法
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 数据包长度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {return serializer.deserialize(requestType, bytes);
}
return null;
}
思考
JSON 序列化形式之外其余序列化形式如何实现?
Java 原生序列化
- 类实现 Serializable 接口
-
具体底层由 ObjectOutputStream 和 ObjectInputStream 实现
Hessian
- Hessian 是动静类型、二进制、紧凑的,并且可跨语言移植的一种序列化框架
-
Hessian 协定要比 JDK、JSON 更加紧凑,性能上要比 JDK、JSON 序列化高效很多,而且生成的字节数也更小
Protobuf
- 谷歌实现的混合语言数据规范
- 轻便、高效的结构化数据存储格局
- 反对 Java、Python、C++、Go 等语言
-
要求定义 IDL(Interface description language),并且应用对应语言的 IDL 生成序列化工具类
Thrift
- Facebook 于 2007 年开发的跨语言的 rpc 服框架
- 通过 Thrift 的编译环境生成各种语言类型的接口文件
序列化和编码都是 JAVA 对象封装二进制过程,两者的分割和区别
总结起来就是一句话:序列化是指标,编码是办法。网上有一张图十分直观的展现了两者的区别。
两者的分割和区别
编码:信息从一种模式或格局转换为另一种模式的过程,目标是不便传输协定通信。
序列化:“序列化”其实自身也是“信息从一种模式或格局转换为另一种模式的过程”,只不过这个表现形式直观具体,序列化也经常用于表白一个对象的状态。
聊天零碎的 Netty 细节优化
优化局部是聊天零碎的精华,也是应用 Netty 实际十分有价值的领导和参考,所以优先把优化局部放到后面介绍。
1. 应用共享 Handler
问题剖析
在旧版本代码中,每个新连贯每次通过 ChannelInitializer 调用,都会产生 9 个指令对象都被 new 一遍操作,然而能够看到其实很多处理器外部是没有任何 “ 状态 ” 的,对于无状态的业务处理器就能够应用单例模式封装。
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new AuthHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new CreateGroupRequestHandler());
ch.pipeline().addLast(new JoinGroupRequestHandler());
ch.pipeline().addLast(new QuitGroupRequestHandler());
ch.pipeline().addLast(new ListGroupMembersRequestHandler());
ch.pipeline().addLast(new GroupMessageRequestHandler());
ch.pipeline().addLast(new LogoutRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
优化伎俩
- 通过退出注解
@ChannelHandler.Shareble
,示意这个 handler 是反对多个 channel 共享的,否则会报错。 - 公布动态 final 的不可变对象 来实现单例,编译器优化。
- 最初还能够压缩 Handler,把编码和解码过程放到一个 Handler 和专用的 Handler 放到一个 Handller 解决(比方申请指令散发解析解决)。
注意事项
次要的注意事项如下:
- 并不是所有的 Handler 都能够单例
- Spliter 不是单例的,因为它须要对每个数据做拆包解决。
2. 缩短事件流传门路
问题剖析
- 首先,指令的 decode 必须要在最后面后面,因为波及前面的命令解析,所以这个 Handler 是无奈“压缩”的。
- 然而如果把每个命令 decode 之后再流传到每个命令事件,然而对应的事件又不做任何解决,那么会节约很屡次多余的命令判断。
- 基本目标:缩短事件流传链条,事件流传链尽可能短。
优化伎俩
优化伎俩实际上也很简略,那就是 应用对立 Handler。
通常的做法如下:
- 该 Handler 只做判断,不做任何状态存储,应用单例优化。
public static final IMHandler INSTANCE = new IMHandler();
- 聊天零碎中利用 HashMap 存储所有的命令解决 Handler,这里集体顺带指定下初始化大小优化一下。
private IMHandler() {handlerMap = new HashMap<>(7);
handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
}
- 回调
channelRead0
实际上就是委托给 map 中的元素对应的指令处理器解决。
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {handlerMap.get(packet.getCommand()).channelRead(ctx, packet);
}
通过一个对立的处理器包含多个动态单例处理器,无效缩小 JVM 内存开销,单例也能够缩小对象实例化的开销。
3. 事件流传源调整
关键点
如果你的 outBound 类型的 handler 较多,在写数据的时候能用 ctx.writeAndFlush()
就用这个办法,不要用 ctx.channel().writeAndFlush()
。
起因
究其原因是 ctx.writeAndFlush() 会绕过所有不须要解决的其余 Outbound 类型。ctx.writeAndFlush()
是从 pipeline 链中的 以后节点开始往前找到第一个 outBound 类型向前流传 的,如果这个对象不须要其余 outBound 的 handler 解决就能够用这个办法。
而 ctx.channel().writeAndFlush() 体现则不同,它是从 pipeline 链中的 最初一个 outBound 类型的 handler 开始,把对象往前进行流传,从图中就能够看到,outBound 的处理器越多,就会产生越多“无用”操作。
当然如果确定前面的 outBound 须要如此解决,那么就能够用这个办法。
相干问题
- writeAndFlush 为什么能够缩短事件流传门路?
- 它是如何实现 OutBound 类型的事件流传缩短的?
4. 缩小阻塞主线程的操作【重要】
Netty 中容易被忽视,却是十分重要的一个概念。那就是 一个 Channel 的其中一个 Handler 阻塞,会导致所有其余绑定的 Channel 一起被拖慢。
比方只有有一个 channel
的一个 handler
中的 channelRead0()
办法阻塞了 NIO 线程,最终都会拖慢绑定在该 NIO 线程上的其余所有的 channel
List<Channel> channelList = 已有数据可读的 channel
for (Channel channel in channelist) {for (ChannelHandler handler in channel.pipeline()) {handler.channelRead0();
}
}
下面的操作如果 for 循环某次呈现卡顿,这不仅仅拖慢一个客户端,而是拖慢所有客户端。
所以 Netty 进行客户端解决的时候个别设计为非阻塞模式,或者会应用 业务线程池 去预防这种状况。业务线程池的实现形式更为常见,也就是 Netty 中一套线程池,理论处理过程中再委派给自定义的业务线程池单开线程解决。这样就实现了非阻塞异步执行工作的目标。
须要留神引入业务线程池会减少零碎复杂度,也会减少线上调试难度,所以做好链路追踪非常重要。
5. 如何精确统计时长?
错误做法:在一个线程的头尾退出时间差计算得出执行时长后果。
正确做法:应用writeAndFlush+addListener 的形式判断 futrue.isDone
之后才计算
起因:writeAndFlush 在非 NIO 线程中它是一个异步操作,其余操作由第一个工作队列异步执行。
关键点:writeAndFlush 真正执行实现才算是实现解决,监听它实现解决的回调动作能力算出精确执行时长。
优化小结
- 如果 Handler 多例然而无状态,齐全能够改为单例模式。
- 尽可能减少 Handler 的臃肿,避免调用链路过长。
- Handler 的耗时操作要交给线程池开启新线程解决,一个耗时操作不只影响单个 Channel。倡议业务线程池独自开新线程形式优化,然而须要留神和线程绑定的相干参数解决问题。
- 耗时统计,writeAndFlush 属于异步工作。
实现登录
解决流程图
实现思路
- 指标客户端和服务端别离启动 Netty 服务。
- 客户端发送登录申请指令,服务端解码之后依据传输后果校验,依据校验后果构建登录申请响应指令 LoginResponsePacket。
-
通过 ctx.writeAndFlush(loginResponsePacket); 回送响应后果给客户端。
- 登录校验胜利,通过 SessionUtil 增加 session 信息
- 客户端登录胜利之后,构建申请指令对象,设置参数,通过 Netty 发送到服务端。
- 服务端收到申请进行验证,并且构建绝对应的响应指令后果对象。
实现步骤
上面是大抵的实现步骤:
- 增加 LoginRequestHandler 登录逻辑处理器在 Server 端。
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
@ChannelHandler.Sharable
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {public static final LoginRequestHandler INSTANCE = new LoginRequestHandler();
protected LoginRequestHandler() {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(loginRequestPacket.getVersion());
loginResponsePacket.setUserName(loginRequestPacket.getUserName());
if (valid(loginRequestPacket)) {loginResponsePacket.setSuccess(true);
String userId = IDUtil.randomId();
loginResponsePacket.setUserId(userId);
System.out.println("[" + loginRequestPacket.getUserName() + "]登录胜利");
SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());
} else {loginResponsePacket.setReason("账号密码校验失败");
loginResponsePacket.setSuccess(false);
System.out.println(new Date() + ": 登录失败!");
}
// 登录响应
ctx.writeAndFlush(loginResponsePacket);
}
private boolean valid(LoginRequestPacket loginRequestPacket) {return true;}
@Override
public void channelInactive(ChannelHandlerContext ctx) {SessionUtil.unBindSession(ctx.channel());
}
}
- 在客户端同样增加Handler,
LoginResponseHandler
,LoginResponseHandler 的解决逻辑如下。
ch.pipeline().addLast(LoginResponseHandler.INSTANCE);
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {String userId = loginResponsePacket.getUserId();
String userName = loginResponsePacket.getUserName();
if (loginResponsePacket.isSuccess()) {System.out.println("[" + userName + "]登录胜利,userId 为:" + loginResponsePacket.getUserId());
SessionUtil.bindSession(new Session(userId, userName), ctx.channel());
} else {System.out.println("[" + userName + "]登录失败,起因:" + loginResponsePacket.getReason());
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {System.out.println("客户端连贯被敞开!");
}
}
客户端登录胜利或者失败,如何把失败或者胜利标识绑定在客户端连贯?服务端如何高效断定客户端从新登录?
在聊天零碎中实现比较简单粗犷。服务端高效判断的办法是在ConcurrentHashMap
,Map 当中存储用户的 ID,如果登录胜利则存储到此 Map 中,服务端也只须要判断 Map 元素即可高效判断是否登录。
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
热插拔客户端是否登录验证
首先校验是否登录局部封装到工具类当中,实现比较简单。
SessionUtil
public static boolean hasLogin(Channel channel) {return getSession(channel) != null;
}
public static Session getSession(Channel channel) {return channel.attr(Attributes.SESSION).get();}
// AttributeKey<Session> SESSION = AttributeKey.newInstance("session");
AuthHandler
实现热插拔的思路是 判断是否登录,对立通过该调用链条实现 ,AuthHandler 自身作为 独自处理器 封装判断登录校验逻辑。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (!SessionUtil.hasLogin(ctx.channel())) {ctx.channel().close();} else {ctx.pipeline().remove(this);
super.channelRead(ctx, msg);
}
}
实现双端收发音讯
客户端解决
客户端胜利登录之后,下一步是实现客户端和服务端相互发送数据。客户端收音讯处理器如下:
// 收音讯处理器
ch.pipeline().addLast(new MessageResponseHandler());
MessageResponseHandler
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) {String fromUserId = messageResponsePacket.getFromUserId();
String fromUserName = messageResponsePacket.getFromUserName();
System.out.println(fromUserId + ":" + fromUserName + "->" + messageResponsePacket
.getMessage());
}
}
服务端解决
因为是通用组件,服务端这里封装到 IMHandler 通用组件当中。
handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
MessageRequestHandler
@ChannelHandler.Sharable
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {public static final MessageRequestHandler INSTANCE = new MessageRequestHandler();
private MessageRequestHandler() {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {long begin = System.currentTimeMillis();
// 1. 拿到音讯发送方的会话信息
Session session = SessionUtil.getSession(ctx.channel());
// 2. 通过音讯发送方的会话信息结构要发送的音讯
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setFromUserId(session.getUserId());
messageResponsePacket.setFromUserName(session.getUserName());
messageResponsePacket.setMessage(messageRequestPacket.getMessage());
// 3. 拿到音讯接管方的 channel Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId());
// 4. 将音讯发送给音讯接管方
if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) {toUserChannel.writeAndFlush(messageResponsePacket).addListener(future -> {if (future.isDone()) {}});
} else {System.err.println("[" + session.getUserId() + "] 不在线,发送失败!");
}
}
}
小结
实现双端收发音讯小结内容如下:
- 定义收发音讯 Java 对象,对于音讯进行收发。
- 学习
Channel
的attr
的理论用法,能够给 Channel 绑定属性并且设置某些状态,外部理论也是通过 Map 保护的,所以不须要用户内部本人在自定义去保护。 - 如何在控制台当中获取音讯并且发送到服务端。
- 服务端回传音讯给客户端。
ChannelPipleline 和 ChannelHandler 概念
本局部是补充局部。次要介绍 Pipeline 和 ChannelHanlder 形成和一些根底概念。了解这一点之前须要先了解 Channel 这个概念。
ChannelPipleline 和 ChannelHandler 形成图
Channel 概念了解
一个客户端连贯对应一个 Channel,这个 Channel 能够类比 BIO 当中的传统概念 Socket 套接字。
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
一个网络套接字的节点或一个可能进行(网络)I/ O 操作的组件,如读、写、连贯和绑定。
ChannelPipeline
源码对于 ChannelPipeline 的定义如下:
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel. ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in a pipeline interact with each other.
源码中还有一个直观的设计图。
下图形容了 I / O 事件在 ChannelPipeline 中是如何被 ChannelHandlers 解决的。一个 I / O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 解决,并通过调用 ChannelHandlerContext 中定义的事件流传办法,如 ChannelHandlerContext.fireChannelRead(Object)和 ChannelHandlerContext.write(Object),转发给其最靠近的处理程序。
ChannelPipeline
的外围如下:
- 解决或拦挡一个 Channel 的入站事件和出站操作的链表。
- 通过责任链模式的设计,能够齐全自定义解决逻辑和
ChannelHandler
之间相互通信的逻辑。
ChannelContext
ChannelHandler 与 Channel 和 ChannelPipeline 之间的映射关系 ,由ChannelHandlerContext
进⾏保护,依据其名称 Context 也能够看到存储更为丰盛的信息。
Enables a ChannelHandler to interact with its ChannelPipeline and other handlers.
使得 ChannelHandler 可能与它的 ChannelPipeline 和其余处理程序互动。
ChannelContext
能够获取整个Channel
的信息。- 获取所有的上下文。
- 逻辑处理器 ChannelHandler 定义解决逻辑。
ChannelHanlder
ChannelHanlder
蕴含两种了解。
第一种:能够了解为 socket 连贯,客户端和服务端连贯的时候会创立一个 channel。负责根本的 IO 操作,例如:bind()
、connect()
、read()
、write()
。
第二种:Netty 的 Channel 接口所提供的 API,大大减少了 Socket 类复杂性。
因为 Channel 连贯过程中存在双端 input/output
,所以 ChannelHandler
也分类为 ChannelInboundHandler
和 ChannelOutboundHandler
。
ChannelInboundHandler
- 读取的逻辑形象。
channelRead
是最重要的办法。- 配合
ByteBuf
应用进行buf.read
推动读指针挪动。
ChannelOutboundHandler
- 对应写出的逻辑形象。
- 外围办法是
write
,writeAndFlush
。
适配器
在应用过程中还存在对应的适配器。
ChannelOutboundHandlerAdapter
(留神解决程序和增加 addLast 的程序相同)ChannelInboundHandlerAdapter
客户端和服务端的 SimpleChannelInboundHandler/ChannelInboundHandlerAdapter 简化
整个聊天零碎大部分的指令判断逻辑是反复的,上面介绍如何通过 SImpleChannelInboundHandler/ChannelInboundHandlerAdapter 简化指令的解决逻辑。
ChannelInboundHandlerAdapter
which allows to explicit only handle a specific type of messages. For example here is an implementation which only handleString
messages.
ChannelInboundHandlerAdapter 容许明确地只解决特定类型的音讯。而 SimpleChannelInboundHandler
提供了一个模板,作用是把解决逻辑不变的内容写好在 channelRead(ctx,msg)
中,并且在外面调用 channelRead0
,这样解决之后就能够通过形象办法实现传递到子类中去了。
区别
SimpleChannelInboundHandler
和 ChannelInboundHandlerAdapter
这两个类应用上不太好辨别,上面再补充介绍一下如何正确对待应用两者。
ChannelInboundHandlerAdapter
须要笼罩的办法是 channelRead,特点是 不会主动开释音讯 ,须要调用ctx.fireChannelRead(msg) 向后续链条处理器传递音讯,也就是须要手动通过责任链的形式传递给下位处理器。
SimpleChannelInboundHandler
是 ChannelInboundHandlerAdapter
的子类, 做了额定的解决,会主动开释音讯 ,如果还须要持续传递音讯,需调用一次 ReferenceCountUtil.retain(msg)。需注意SimpleChannelInboundHandler
也须要调用 ctx.fireChannelRead(msg)
来触发链条中下一处理器解决。
ChannelInboundHandlerAdapter
通常用于处于链条两头的某些环节解决,对数据进行某些解决,如数据验证,须要将音讯持续传递。
SimpleChannelInboundHandler
则比拟适宜链条最初一个环节,该环节解决完后,后续不再须要该音讯,因而能够主动开释。
利用
在聊天零碎中对立解决的 Handler 继承了 SimpleChannelInboundHandler,重写 channelRead0
办法,次要对于解码之后的操作指令和通用 Map 进行匹配,如果匹配则散发到具体的逻辑处理器。
@ChannelHandler.Sharable
public class IMHandler extends SimpleChannelInboundHandler<Packet> {public static final IMHandler INSTANCE = new IMHandler();
private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> handlerMap;
private IMHandler() {handlerMap = new HashMap<>(7);
handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {handlerMap.get(packet.getCommand()).channelRead(ctx, packet);
}
}
客户端和服务端单聊
指标
- 输出用户名,服务端随机调配 ID,这里省去通过账号和明码注册过程。
- 多个客户端登录,用 userId 空格 音讯的形式单聊。
实现过程
-
应用工具类把 UserId 和 Channel 绑定为 Session。
- Session 的信息蕴含用户 ID 以及名称,后续能够扩大更多的字段。
-
应用
SessionUtil
工具类操作 Session,通过 Session 储存以后会话信息。- 这里用的 ConcurrentHashMap 实现并发平安
- ConcurrentHashMap 为 userId -> Channel 的映射 Map。
- 用户登录的时候,须要把 Session 塞入 Map。
- 当用户断开
Channel
连贯退出的时候,须要移除 Session 信息
-
服务端承受音讯并且转发(这里 Netty 相似转发手机信号的基站)
- 获取会话信息。
- 结构发给客户端的对象
MessageResponse
。 - 音讯接管方标识获取对应
Channel
。 - 如果指标用户登录则发送音讯,如果对方不在线,则控制台打印正告信息。
具体的代码在后面的收发音讯中有提到过,这里反复展现一遍。
MessageResponseHandler
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) {String fromUserId = messageResponsePacket.getFromUserId();
String fromUserName = messageResponsePacket.getFromUserName();
System.out.println(fromUserId + ":" + fromUserName + "->" + messageResponsePacket
.getMessage());
}
}
MessageRequestHandler
@ChannelHandler.Sharable
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {public static final MessageRequestHandler INSTANCE = new MessageRequestHandler();
private MessageRequestHandler() {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {long begin = System.currentTimeMillis();
// 1. 拿到音讯发送方的会话信息
Session session = SessionUtil.getSession(ctx.channel());
// 2. 通过音讯发送方的会话信息结构要发送的音讯
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setFromUserId(session.getUserId());
messageResponsePacket.setFromUserName(session.getUserName());
messageResponsePacket.setMessage(messageRequestPacket.getMessage());
// 3. 拿到音讯接管方的 channel Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId());
// 4. 将音讯发送给音讯接管方
if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) {toUserChannel.writeAndFlush(messageResponsePacket).addListener(future -> {if (future.isDone()) {}});
} else {System.err.println("[" + session.getUserId() + "] 不在线,发送失败!");
}
}
}
群聊发动和告诉
上面两个大节围绕群聊实现介绍。整个群聊和单聊实现相似,都是通过标识获取 Channel,为了方面多个成员治理,设计 ChannelGroup
实现 Channel
的批量操作。
预期成果
- 三位用户顺次登录。
- 控制台输出 createGroup 指令,提醒创立群聊须要 userId 列表,之后以英文逗号分隔 userId。
- 群聊创立胜利之后,所有群聊成员收到退出胜利音讯。
创立群聊实现
次要逻辑如下:
- 创立一个 channel 分组。
- 筛选出待退出群聊的用户的 channel 和 userName。
- 创立群聊创立后果的响应。
- 给每个客户端发送拉群告诉
- 保留群组相干的信息。
其中存储群的相干信息利用了 ConcurrentHashMap
实现,和 Session 的会话信息存储形式相似,ChannelGroup对象负责封装多个 Channel 的信息,模仿群聊中的“群”。
@ChannelHandler.Sharable
public class CreateGroupRequestHandler extends SimpleChannelInboundHandler<CreateGroupRequestPacket> {public static final CreateGroupRequestHandler INSTANCE = new CreateGroupRequestHandler();
private CreateGroupRequestHandler() {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CreateGroupRequestPacket createGroupRequestPacket) {List<String> userIdList = createGroupRequestPacket.getUserIdList();
List<String> userNameList = new ArrayList<>();
// 1. 创立一个 channel 分组
ChannelGroup channelGroup = new DefaultChannelGroup(ctx.executor());
// 2. 筛选出待退出群聊的用户的 channel 和 userName for (String userId : userIdList) {Channel channel = SessionUtil.getChannel(userId);
if (channel != null) {channelGroup.add(channel);
userNameList.add(SessionUtil.getSession(channel).getUserName());
}
}
// 3. 创立群聊创立后果的响应
String groupId = IDUtil.randomId();
CreateGroupResponsePacket createGroupResponsePacket = new CreateGroupResponsePacket();
createGroupResponsePacket.setSuccess(true);
createGroupResponsePacket.setGroupId(groupId);
createGroupResponsePacket.setUserNameList(userNameList);
// 4. 给每个客户端发送拉群告诉
channelGroup.writeAndFlush(createGroupResponsePacket);
System.out.print("群创立胜利,id 为" + createGroupResponsePacket.getGroupId() + ",");
System.out.println("群外面有:" + createGroupResponsePacket.getUserNameList());
// 5. 保留群组相干的信息
SessionUtil.bindChannelGroup(groupId, channelGroup);
}
}
客户端解决局部则是简略的打印创立群聊胜利的信息,实现比较简单这里不再贴出相干代码。
群聊成员治理实现
设计流程和实现思路
设计流程
- 退出群聊,控制台输入创立胜利音讯。
- 控制台输出 joinGroup 之后输出群 ID,退出群聊,控制台显示退出群胜利。
- 控制台输出 listGroupMembers 而后输出群 ID,展现群成员。
- quitGroup 输出群 ID,进行退群
- 控制台输出 joinGroup 之后输出群 ID 显示对应成员不在,则退群胜利。
实现思路
- 在控制台中退出群退出的命令处理器。
- 服务端解决群聊申请。
- 客户端解决加群响应.
- 群聊退出实现。
在控制台中退出群退出的命令处理器
JoinGroupConsoleCommand
public class JoinGroupConsoleCommand implements ConsoleCommand {
@Override
public void exec(Scanner scanner, Channel channel) {JoinGroupRequestPacket joinGroupRequestPacket = new JoinGroupRequestPacket();
System.out.print("输出 groupId,退出群聊:");
String groupId = scanner.next();
joinGroupRequestPacket.setGroupId(groupId);
channel.writeAndFlush(joinGroupRequestPacket);
}
}
服务端解决群聊申请
服务端解决群聊申请:
- 构建 Channel 分区,把处在同一个分组的 Channel 放到一个 List 当中存储
- 如果群聊构建胜利,则构建创立胜利响应后果。
@ChannelHandler.Sharable
public class JoinGroupRequestHandler extends SimpleChannelInboundHandler<JoinGroupRequestPacket> {public static final JoinGroupRequestHandler INSTANCE = new JoinGroupRequestHandler();
private JoinGroupRequestHandler() {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, JoinGroupRequestPacket requestPacket) {
// 1. 获取群对应的 channelGroup,而后将以后用户的 channel 增加进去
String groupId = requestPacket.getGroupId();
ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);
channelGroup.add(ctx.channel());
// 2. 结构加群响应发送给客户端
JoinGroupResponsePacket responsePacket = new JoinGroupResponsePacket();
responsePacket.setSuccess(true);
responsePacket.setGroupId(groupId);
ctx.writeAndFlush(responsePacket);
}
}
客户端解决加群响应
简略打印加群的响应音讯。
public class JoinGroupResponseHandler extends SimpleChannelInboundHandler<JoinGroupResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, JoinGroupResponsePacket responsePacket) {if (responsePacket.isSuccess()) {System.out.println("退出群 [" + responsePacket.getGroupId() + "] 胜利!");
} else {System.err.println("退出群 [" + responsePacket.getGroupId() + "] 失败,起因为:" + responsePacket.getReason());
}
}
}
群聊退出实现
群聊退出次要是获取群对应的 channelGroup,而后将以后用户的 channel 移除,之后构建退群的响应信息回传客户端即可。
QuitGroupRequestHandler
@ChannelHandler.Sharable
public class QuitGroupRequestHandler extends SimpleChannelInboundHandler<QuitGroupRequestPacket> {public static final QuitGroupRequestHandler INSTANCE = new QuitGroupRequestHandler();
private QuitGroupRequestHandler() {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, QuitGroupRequestPacket requestPacket) {
// 1. 获取群对应的 channelGroup,而后将以后用户的 channel 移除
String groupId = requestPacket.getGroupId();
ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);
channelGroup.remove(ctx.channel());
// 2. 结构退群响应发送给客户端
QuitGroupResponsePacket responsePacket = new QuitGroupResponsePacket();
responsePacket.setGroupId(requestPacket.getGroupId());
responsePacket.setSuccess(true);
ctx.writeAndFlush(responsePacket);
}
}
心跳检测
网络问题
假死
TCP 层面来看,服务端收到 4 次握手包或者 RST 包才算真正断开连接,如果中途应用程序并没有捕捉到,此时是认为这条连贯存在的。
假死引发问题
- 客户端发送数据超时无响应,影响体验。
-
节约 CPU 和内存资源,性能下滑。
假死起因
- 公网丢包,网络抖动。
- 应用程序阻塞无奈读写。
- 客户端或者服务端设别故障,网卡,机房故障。
为了解决下面的问题,通常会应用心跳检测机制定期检测每个 Channel
连贯是否存活。
服务端心跳检测实现
- 通过
IdleStateHandler
自带Handler
实现 - 继承类,而后开启定时工作
- 触发假死该
Handler
回调channelIdle
办法
客户端预判和进攻假死
- 新建
Handler
。 - 开启定时线程。
- 组装心跳包。
- 发送心跳。
- 服务端简略开发承受和辨认心跳包的 Handler,之后回送收到心跳包音讯即可。
注意事项
- 心跳检测 Handler 插入到整个 Pipeline 最后面,因为如果连贯理论曾经断开后续的所有解决均无意义。
- 假死不肯定“死”,避免服务端误判,客户端也须要措施避免假死和预判假死,这就是客户端预判的含意。
思考
- IdleHandler 可否单例?
- 断开链接之后从新连贯登录
IdleHandler 可否单例?
答案是 不能。因为它并不是无状态的,并且每个 Channel 都有各自的连贯状态。
断开链接之后从新连贯登录
通过额定的线程定时轮循所有的连贯的活跃性,如果发现其中有死连贯,则执行重连。
写在最初
相熟聊天零碎对于后续的源码剖析非常有意义,我的项目的整体构建比较简单,集体在笔记中将重点局部做了一个梳理。
文章参考
https://juejin.cn/book/m/6844733738119593991/section/6844733738291576840?suid=2040300414187416