关于netty:跟闪电侠学Netty阅读笔记-聊天系统实现

10次阅读

共计 24442 个字符,预计需要花费 62 分钟才能阅读完成。

引言

本局部整合聊天零碎无关的章节,内容次要是介绍要害性能的实现逻辑和局部代码实现,倡议读者先看看作者的博客我的项目,切换到不同的分支看看各个细节性能如何实现。这里仅仅记录一些集体学习过程的重点局部。

思维导图

https://www.mubu.com/doc/1dunN_7Luzl

我的项目代码

作者的仓库代码地址:https://github.com/lightningMan/flash-netty5

通信协议设计和自定义编解码实现

什么是通信协议?

基于 TCP 通信均为二进制协定,底层都是通过字节进行传输的。在通信协议当中规定数据传输的每一个字节含意。

通信过程

  1. 客户端转换数据为二进制。
  2. 网络传输给服务端。
  3. 服务端依据协定规定读取二进制数据。
  4. 服务端解决数据返回响应后果给客户端。

聊天零碎的通信协议数据对象设计

在聊天零碎当中通信协议的设计如下。

4 字节魔数

比方 Java 的字节码CafeBabe,用于疾速辨认是否自定义协定,也能够不便疾速提取数据。

public static final int MAGIC_NUMBER = 0x12345678;

1 字节版本号

相似 TCP 的 IPV4 还是 IPV6。

/**  
 * 协定版本  
 */  
@JSONField(deserialize = false, serialize = false)  
private Byte version = 1;

1 字节序列化算法

应用 1 个字节来标识算法。

/**  
 * 序列化算法定义  
 */  
public interface SerializerAlgorithm {  
    /**  
     * json 序列化  
     */  
    byte JSON = 1;  
}

1 字节指令

一个字节最多示意 256 种指令。留神在设计上指令和版本号进行绑定关联,实现不同版本之间的指令兼容,进步程序的健壮性。

@Data  
public abstract class Packet {  
    /**  
     * 协定版本  
     */  
    @JSONField(deserialize = false, serialize = false)  
    private Byte version = 1;  
  
  
    @JSONField(serialize = false)  
    public abstract Byte getCommand();}

4 字节数据长度

数据长度是必要的,次要用于字节流这种连续不断的数据模式进行切割。

byteBuf.writeInt(bytes.length);

int 根本数据类型在 Java 中默认占 4 个字节,这 4 个字节用来存储字节数组的长度。

N 字节数据

数据局部。

如何实现 JAVA 对象二进制互相转化?

所谓互转对应了网络 Socket IO 的 input/output 中的数据转化局部,实体数据转为字节流这个过程咱们通常叫做 编码,反之则是解码。

无论是编码还是解码,都是依赖 Netty 自定义的 MessageToMessageCodec实现。聊天零碎的编码和解码工作都是依赖 PacketCodecHandler 实现的。

@ChannelHandler.Sharable  
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {public static final PacketCodecHandler INSTANCE = new PacketCodecHandler();  
  
    private PacketCodecHandler() {}  
  
    @Override  
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) {out.add(PacketCodec.INSTANCE.decode(byteBuf));  
    }  
  
    @Override  
    protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();  
        PacketCodec.INSTANCE.encode(byteBuf, packet);  
        out.add(byteBuf);  
    }  
}

自定义逻辑处理器,在 Netty Server 中须要注册到 pipeline 当中。

public static void main(String[] args) {NioEventLoopGroup boosGroup = new NioEventLoopGroup();  
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();  
  
    final ServerBootstrap serverBootstrap = new ServerBootstrap();  
    serverBootstrap  
            .group(boosGroup, workerGroup)  
            .channel(NioServerSocketChannel.class)  
            .option(ChannelOption.SO_BACKLOG, 1024)  
            .childOption(ChannelOption.SO_KEEPALIVE, true)  
            .childOption(ChannelOption.TCP_NODELAY, true)  
            .childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {  
                    //......
                    ch.pipeline().addLast(PacketCodecHandler.INSTANCE); 
                    // ......
                }  
            });  
  
  
    bind(serverBootstrap, PORT);  
}

这里解释下为什么 PacketCodecHandler 要被注解标记为“Sharable”,因为编码和解码可能在多个 handler 中用到,为了提高效率,这里通过共享缩小实例的创立。

下文也会介绍这个单例模式的优化点。

带着疑难咱们再看看 @ChannelHandler.Sharable 这个注解的源码解释。

Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.

If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.
This annotation is provided for documentation purpose, just like the JCIP annotations 

下面的内容翻译过去就是:

被注解的 Sharable 的同一个 ChannelHandler 实例,能够被屡次增加到一个或多个 ChannelPipeline 中,并且能够确保不会呈现竞争状况。如果没有指定这个注解,那么每次就创立新的 Channel 都须要应用新的 Handler 实例。在有不共享的状态,如成员变量时候,就不能用这个注解。

简略来说 @ChannelHandler.Sharable 实现了 Netty 中的 ”Bean” 单例和共享。

实战局部

数据编码过程(思路)

上面是数据解码的根本编写思路。

  1. 增加编码器。
ch.pipeline().addLast(new PacketEncoder());
  1. ByteBuf 一一写字段,实现编码过程。
public class PacketEncoder extends MessageToByteEncoder<Packet> {  
  
    @Override  
    protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {PacketCodec.INSTANCE.encode(out, packet);  
    }  
}
  1. 残缺的自定义协定:PacketCodec#encode
public void encode(ByteBuf byteBuf, Packet packet) {  
    // 1. 序列化 java 对象  
    byte[] bytes = Serializer.DEFAULT.serialize(packet);  
  
    // 2. 理论编码过程  
    byteBuf.writeInt(MAGIC_NUMBER);  
    byteBuf.writeByte(packet.getVersion());  
    byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());  
    byteBuf.writeByte(packet.getCommand());  
    byteBuf.writeInt(bytes.length);  
    byteBuf.writeBytes(bytes);  
}

解码数据过程(思路)

上面是数据解码的根本编写思路:

  1. 在 handler 当中增加自定义逻辑处理器。
.handler(new ChannelInitializer<SocketChannel>() {  
    @Override  
    public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new PacketDecoder());
    }  
});
  1. 定义解码逻辑处理器。
public class PacketDecoder extends MessageToMessageDecoder<ByteBuf> {  
  
    @Override  
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {out.add(PacketCodec.INSTANCE.decode(in));  
    }  
}

上面定义具体的解码过程:

  • 跳过魔数。
  • 跳过协定版本号。
  • 读取序列化算法。
  • 读取指令,数据包,算法标识等自定义协定的根本内容。
  • 依据数据长度。
  • 取出数据。

PacketCodec#decode

public Packet decode(ByteBuf byteBuf) {// 跳过 magic number    byteBuf.skipBytes(4);  
  
    // 跳过版本号  
    byteBuf.skipBytes(1);  
  
    // 序列化算法  
    byte serializeAlgorithm = byteBuf.readByte();  
  
    // 指令  
    byte command = byteBuf.readByte();  
  
    // 数据包长度  
    int length = byteBuf.readInt();  
  
    byte[] bytes = new byte[length];  
    byteBuf.readBytes(bytes);  
  
    Class<? extends Packet> requestType = getRequestType(command);  
    Serializer serializer = getSerializer(serializeAlgorithm);  
  
    if (requestType != null && serializer != null) {return serializer.deserialize(requestType, bytes);  
    }  
  
    return null;  
}

思考

JSON 序列化形式之外其余序列化形式如何实现?

Java 原生序列化
  • 类实现 Serializable 接口
  • 具体底层由 ObjectOutputStream 和 ObjectInputStream 实现

Hessian
  • Hessian 是动静类型、二进制、紧凑的,并且可跨语言移植的一种序列化框架
  • Hessian 协定要比 JDK、JSON 更加紧凑,性能上要比 JDK、JSON 序列化高效很多,而且生成的字节数也更小

Protobuf
  • 谷歌实现的混合语言数据规范
  • 轻便、高效的结构化数据存储格局
  • 反对 Java、Python、C++、Go 等语言
  • 要求定义 IDL(Interface description language),并且应用对应语言的 IDL 生成序列化工具类

Thrift
  • Facebook 于 2007 年开发的跨语言的 rpc 服框架
  • 通过 Thrift 的编译环境生成各种语言类型的接口文件

序列化和编码都是 JAVA 对象封装二进制过程,两者的分割和区别

总结起来就是一句话:序列化是指标,编码是办法。网上有一张图十分直观的展现了两者的区别。

两者的分割和区别

编码:信息从一种模式或格局转换为另一种模式的过程,目标是不便传输协定通信。

序列化:“序列化”其实自身也是“信息从一种模式或格局转换为另一种模式的过程”,只不过这个表现形式直观具体,序列化也经常用于表白一个对象的状态。

聊天零碎的 Netty 细节优化

优化局部是聊天零碎的精华,也是应用 Netty 实际十分有价值的领导和参考,所以优先把优化局部放到后面介绍。

1. 应用共享 Handler

问题剖析

在旧版本代码中,每个新连贯每次通过 ChannelInitializer 调用,都会产生 9 个指令对象都被 new 一遍操作,然而能够看到其实很多处理器外部是没有任何 “ 状态 ” 的,对于无状态的业务处理器就能够应用单例模式封装

serverBootstrap
                .childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new Spliter());
                        ch.pipeline().addLast(new PacketDecoder());
                        ch.pipeline().addLast(new LoginRequestHandler());
                        ch.pipeline().addLast(new AuthHandler());
                        ch.pipeline().addLast(new MessageRequestHandler());
                        ch.pipeline().addLast(new CreateGroupRequestHandler());
                        ch.pipeline().addLast(new JoinGroupRequestHandler());
                        ch.pipeline().addLast(new QuitGroupRequestHandler());
                        ch.pipeline().addLast(new ListGroupMembersRequestHandler());
                        ch.pipeline().addLast(new GroupMessageRequestHandler());
                        ch.pipeline().addLast(new LogoutRequestHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });

优化伎俩

  • 通过退出注解 @ChannelHandler.Shareble,示意这个 handler 是反对多个 channel 共享的,否则会报错。
  • 公布动态 final不可变对象 来实现单例,编译器优化。
  • 最初还能够压缩 Handler,把编码和解码过程放到一个 Handler 和专用的 Handler 放到一个 Handller 解决(比方申请指令散发解析解决)。

注意事项

次要的注意事项如下:

  • 并不是所有的 Handler 都能够单例
  • Spliter 不是单例的,因为它须要对每个数据做拆包解决。

2. 缩短事件流传门路

问题剖析

  • 首先,指令的 decode 必须要在最后面后面,因为波及前面的命令解析,所以这个 Handler 是无奈“压缩”的。
  • 然而如果把每个命令 decode 之后再流传到每个命令事件,然而对应的事件又不做任何解决,那么会节约很屡次多余的命令判断。
  • 基本目标:缩短事件流传链条,事件流传链尽可能短。

优化伎俩

优化伎俩实际上也很简略,那就是 应用对立 Handler

通常的做法如下:

  1. 该 Handler 只做判断,不做任何状态存储,应用单例优化。
public static final IMHandler INSTANCE = new IMHandler();
  1. 聊天零碎中利用 HashMap 存储所有的命令解决 Handler,这里集体顺带指定下初始化大小优化一下。
private IMHandler() {handlerMap = new HashMap<>(7);  
  
    handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);  
    handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);  
    handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);  
    handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);  
    handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);  
    handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);  
    handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);  
}
  1. 回调channelRead0 实际上就是委托给 map 中的元素对应的指令处理器解决。
@Override  
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {handlerMap.get(packet.getCommand()).channelRead(ctx, packet);  
}

通过一个对立的处理器包含多个动态单例处理器,无效缩小 JVM 内存开销,单例也能够缩小对象实例化的开销。

3. 事件流传源调整

关键点

如果你的 outBound 类型的 handler 较多,在写数据的时候能用 ctx.writeAndFlush() 就用这个办法,不要用 ctx.channel().writeAndFlush()

起因

究其原因是 ctx.writeAndFlush() 会绕过所有不须要解决的其余 Outbound 类型。ctx.writeAndFlush() 是从 pipeline 链中的 以后节点开始往前找到第一个 outBound 类型向前流传 的,如果这个对象不须要其余 outBound 的 handler 解决就能够用这个办法。

ctx.channel().writeAndFlush() 体现则不同,它是从 pipeline 链中的 最初一个 outBound 类型的 handler 开始,把对象往前进行流传,从图中就能够看到,outBound 的处理器越多,就会产生越多“无用”操作。

当然如果确定前面的 outBound 须要如此解决,那么就能够用这个办法。

相干问题

  • writeAndFlush 为什么能够缩短事件流传门路?
  • 它是如何实现 OutBound 类型的事件流传缩短的?

4. 缩小阻塞主线程的操作【重要】

Netty 中容易被忽视,却是十分重要的一个概念。那就是 一个 Channel 的其中一个 Handler 阻塞,会导致所有其余绑定的 Channel 一起被拖慢

比方只有有一个 channel 的一个 handler 中的 channelRead0() 办法阻塞了 NIO 线程,最终都会拖慢绑定在该 NIO 线程上的其余所有的 channel

List<Channel> channelList = 已有数据可读的 channel
for (Channel channel in channelist) {for (ChannelHandler handler in channel.pipeline()) {handler.channelRead0();
   }   
}

下面的操作如果 for 循环某次呈现卡顿,这不仅仅拖慢一个客户端,而是拖慢所有客户端。

所以 Netty 进行客户端解决的时候个别设计为非阻塞模式,或者会应用 业务线程池 去预防这种状况。业务线程池的实现形式更为常见,也就是 Netty 中一套线程池,理论处理过程中再委派给自定义的业务线程池单开线程解决。这样就实现了非阻塞异步执行工作的目标。

须要留神引入业务线程池会减少零碎复杂度,也会减少线上调试难度,所以做好链路追踪非常重要。

5. 如何精确统计时长?

错误做法:在一个线程的头尾退出时间差计算得出执行时长后果。

正确做法:应用writeAndFlush+addListener 的形式判断 futrue.isDone 之后才计算

起因:writeAndFlush 在非 NIO 线程中它是一个异步操作,其余操作由第一个工作队列异步执行。

关键点:writeAndFlush 真正执行实现才算是实现解决,监听它实现解决的回调动作能力算出精确执行时长。

优化小结

  • 如果 Handler 多例然而无状态,齐全能够改为单例模式。
  • 尽可能减少 Handler 的臃肿,避免调用链路过长。
  • Handler 的耗时操作要交给线程池开启新线程解决,一个耗时操作不只影响单个 Channel。倡议业务线程池独自开新线程形式优化,然而须要留神和线程绑定的相干参数解决问题。
  • 耗时统计,writeAndFlush 属于异步工作。

实现登录

解决流程图

实现思路

  • 指标客户端和服务端别离启动 Netty 服务。
  • 客户端发送登录申请指令,服务端解码之后依据传输后果校验,依据校验后果构建登录申请响应指令 LoginResponsePacket。
  • 通过 ctx.writeAndFlush(loginResponsePacket); 回送响应后果给客户端。

    • 登录校验胜利,通过 SessionUtil 增加 session 信息
  • 客户端登录胜利之后,构建申请指令对象,设置参数,通过 Netty 发送到服务端。
  • 服务端收到申请进行验证,并且构建绝对应的响应指令后果对象。

实现步骤

上面是大抵的实现步骤:

  1. 增加 LoginRequestHandler 登录逻辑处理器在 Server 端。
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
  
@ChannelHandler.Sharable  
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {public static final LoginRequestHandler INSTANCE = new LoginRequestHandler();  
  
    protected LoginRequestHandler() {}  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {LoginResponsePacket loginResponsePacket = new LoginResponsePacket();  
        loginResponsePacket.setVersion(loginRequestPacket.getVersion());  
        loginResponsePacket.setUserName(loginRequestPacket.getUserName());  
  
        if (valid(loginRequestPacket)) {loginResponsePacket.setSuccess(true);  
            String userId = IDUtil.randomId();  
            loginResponsePacket.setUserId(userId);  
            System.out.println("[" + loginRequestPacket.getUserName() + "]登录胜利");  
            SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());  
        } else {loginResponsePacket.setReason("账号密码校验失败");  
            loginResponsePacket.setSuccess(false);  
            System.out.println(new Date() + ": 登录失败!");  
        }  
  
        // 登录响应  
        ctx.writeAndFlush(loginResponsePacket);  
    }  
  
    private boolean valid(LoginRequestPacket loginRequestPacket) {return true;}  
  
    @Override  
    public void channelInactive(ChannelHandlerContext ctx) {SessionUtil.unBindSession(ctx.channel());  
    }  
}
  1. 在客户端同样增加HandlerLoginResponseHandler,LoginResponseHandler 的解决逻辑如下。
ch.pipeline().addLast(LoginResponseHandler.INSTANCE);
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {String userId = loginResponsePacket.getUserId();  
        String userName = loginResponsePacket.getUserName();  
  
        if (loginResponsePacket.isSuccess()) {System.out.println("[" + userName + "]登录胜利,userId 为:" + loginResponsePacket.getUserId());  
            SessionUtil.bindSession(new Session(userId, userName), ctx.channel());  
        } else {System.out.println("[" + userName + "]登录失败,起因:" + loginResponsePacket.getReason());  
        }  
    }  
  
    @Override  
    public void channelInactive(ChannelHandlerContext ctx) {System.out.println("客户端连贯被敞开!");  
    }  
}

客户端登录胜利或者失败,如何把失败或者胜利标识绑定在客户端连贯?服务端如何高效断定客户端从新登录?

在聊天零碎中实现比较简单粗犷。服务端高效判断的办法是在ConcurrentHashMap,Map 当中存储用户的 ID,如果登录胜利则存储到此 Map 中,服务端也只须要判断 Map 元素即可高效判断是否登录。

private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();

热插拔客户端是否登录验证

首先校验是否登录局部封装到工具类当中,实现比较简单。

SessionUtil

public static boolean hasLogin(Channel channel) {return getSession(channel) != null;  
}  
  
public static Session getSession(Channel channel) {return channel.attr(Attributes.SESSION).get();}

// AttributeKey<Session> SESSION = AttributeKey.newInstance("session");

AuthHandler

实现热插拔的思路是 判断是否登录,对立通过该调用链条实现 ,AuthHandler 自身作为 独自处理器 封装判断登录校验逻辑。

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (!SessionUtil.hasLogin(ctx.channel())) {ctx.channel().close();} else {ctx.pipeline().remove(this);        
        super.channelRead(ctx, msg);    
    }
}

实现双端收发音讯

客户端解决

客户端胜利登录之后,下一步是实现客户端和服务端相互发送数据。客户端收音讯处理器如下:

// 收音讯处理器  
ch.pipeline().addLast(new MessageResponseHandler());

MessageResponseHandler

public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) {String fromUserId = messageResponsePacket.getFromUserId();  
        String fromUserName = messageResponsePacket.getFromUserName();  
        System.out.println(fromUserId + ":" + fromUserName + "->" + messageResponsePacket  
                .getMessage());  
    }  
}

服务端解决

因为是通用组件,服务端这里封装到 IMHandler 通用组件当中。

handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);

MessageRequestHandler

@ChannelHandler.Sharable  
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {public static final MessageRequestHandler INSTANCE = new MessageRequestHandler();  
  
    private MessageRequestHandler() {}  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {long begin = System.currentTimeMillis();  
  
  
        // 1. 拿到音讯发送方的会话信息  
        Session session = SessionUtil.getSession(ctx.channel());  
  
        // 2. 通过音讯发送方的会话信息结构要发送的音讯  
        MessageResponsePacket messageResponsePacket = new MessageResponsePacket();  
        messageResponsePacket.setFromUserId(session.getUserId());  
        messageResponsePacket.setFromUserName(session.getUserName());  
        messageResponsePacket.setMessage(messageRequestPacket.getMessage());  
  
        // 3. 拿到音讯接管方的 channel        Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId());  
  
        // 4. 将音讯发送给音讯接管方  
        if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) {toUserChannel.writeAndFlush(messageResponsePacket).addListener(future -> {if (future.isDone()) {}});  
        } else {System.err.println("[" + session.getUserId() + "] 不在线,发送失败!");  
  
        }  
    }  
}

小结

实现双端收发音讯小结内容如下:

  1. 定义收发音讯 Java 对象,对于音讯进行收发。
  2. 学习 Channelattr 的理论用法,能够给 Channel 绑定属性并且设置某些状态,外部理论也是通过 Map 保护的,所以不须要用户内部本人在自定义去保护。
  3. 如何在控制台当中获取音讯并且发送到服务端。
  4. 服务端回传音讯给客户端。

ChannelPipleline 和 ChannelHandler 概念

本局部是补充局部。次要介绍 Pipeline 和 ChannelHanlder 形成和一些根底概念。了解这一点之前须要先了解 Channel 这个概念。

ChannelPipleline 和 ChannelHandler 形成图

Channel 概念了解

一个客户端连贯对应一个 Channel,这个 Channel 能够类比 BIO 当中的传统概念 Socket 套接字。

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.

一个网络套接字的节点或一个可能进行(网络)I/ O 操作的组件,如读、写、连贯和绑定。

ChannelPipeline

源码对于 ChannelPipeline 的定义如下:

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel. ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in a pipeline interact with each other.

源码中还有一个直观的设计图。

下图形容了 I / O 事件在 ChannelPipeline 中是如何被 ChannelHandlers 解决的。一个 I / O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 解决,并通过调用 ChannelHandlerContext 中定义的事件流传办法,如 ChannelHandlerContext.fireChannelRead(Object)和 ChannelHandlerContext.write(Object),转发给其最靠近的处理程序。

ChannelPipeline 的外围如下:

  • 解决或拦挡一个 Channel 的入站事件和出站操作的链表。
  • 通过责任链模式的设计,能够齐全自定义解决逻辑和 ChannelHandler 之间相互通信的逻辑。

ChannelContext

ChannelHandler 与 Channel 和 ChannelPipeline 之间的映射关系 ,由ChannelHandlerContext 进⾏保护,依据其名称 Context 也能够看到存储更为丰盛的信息。

Enables a ChannelHandler to interact with its ChannelPipeline and other handlers.

使得 ChannelHandler 可能与它的 ChannelPipeline 和其余处理程序互动。

  • ChannelContext能够获取整个 Channel 的信息。
  • 获取所有的上下文。
  • 逻辑处理器 ChannelHandler 定义解决逻辑。

ChannelHanlder

ChannelHanlder 蕴含两种了解。

第一种:能够了解为 socket 连贯,客户端和服务端连贯的时候会创立一个 channel。负责根本的 IO 操作,例如:bind()connect()read()write()

第二种:Netty 的 Channel 接口所提供的 API,大大减少了 Socket 类复杂性。

因为 Channel 连贯过程中存在双端 input/output,所以 ChannelHandler 也分类为 ChannelInboundHandlerChannelOutboundHandler

ChannelInboundHandler

  • 读取的逻辑形象。
  • channelRead 是最重要的办法。
  • 配合 ByteBuf 应用进行 buf.read 推动读指针挪动。

ChannelOutboundHandler

  • 对应写出的逻辑形象。
  • 外围办法是 writewriteAndFlush

适配器

在应用过程中还存在对应的适配器。

  • ChannelOutboundHandlerAdapter(留神解决程序和增加 addLast 的程序相同)
  • ChannelInboundHandlerAdapter

客户端和服务端的 SimpleChannelInboundHandler/ChannelInboundHandlerAdapter 简化

整个聊天零碎大部分的指令判断逻辑是反复的,上面介绍如何通过 SImpleChannelInboundHandler/ChannelInboundHandlerAdapter 简化指令的解决逻辑。

ChannelInboundHandlerAdapter which allows to explicit only handle a specific type of messages. For example here is an implementation which only handle String messages.

ChannelInboundHandlerAdapter 容许明确地只解决特定类型的音讯。而 SimpleChannelInboundHandler 提供了一个模板,作用是把解决逻辑不变的内容写好在 channelRead(ctx,msg) 中,并且在外面调用 channelRead0,这样解决之后就能够通过形象办法实现传递到子类中去了。

区别

SimpleChannelInboundHandlerChannelInboundHandlerAdapter 这两个类应用上不太好辨别,上面再补充介绍一下如何正确对待应用两者。

ChannelInboundHandlerAdapter 须要笼罩的办法是 channelRead,特点是 不会主动开释音讯 ,须要调用ctx.fireChannelRead(msg) 向后续链条处理器传递音讯,也就是须要手动通过责任链的形式传递给下位处理器。

SimpleChannelInboundHandlerChannelInboundHandlerAdapter 的子类, 做了额定的解决,会主动开释音讯 ,如果还须要持续传递音讯,需调用一次 ReferenceCountUtil.retain(msg)。需注意SimpleChannelInboundHandler 也须要调用 ctx.fireChannelRead(msg) 来触发链条中下一处理器解决。

ChannelInboundHandlerAdapter通常用于处于链条两头的某些环节解决,对数据进行某些解决,如数据验证,须要将音讯持续传递。

SimpleChannelInboundHandler则比拟适宜链条最初一个环节,该环节解决完后,后续不再须要该音讯,因而能够主动开释。

利用

在聊天零碎中对立解决的 Handler 继承了 SimpleChannelInboundHandler,重写 channelRead0 办法,次要对于解码之后的操作指令和通用 Map 进行匹配,如果匹配则散发到具体的逻辑处理器。

@ChannelHandler.Sharable  
public class IMHandler extends SimpleChannelInboundHandler<Packet> {public static final IMHandler INSTANCE = new IMHandler();  
  
    private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> handlerMap;  
  
    private IMHandler() {handlerMap = new HashMap<>(7);  
  
        handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);  
        handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);  
        handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);  
        handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);  
        handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);  
        handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);  
        handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);  
    }  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {handlerMap.get(packet.getCommand()).channelRead(ctx, packet);  
    }  
}

客户端和服务端单聊

指标

  • 输出用户名,服务端随机调配 ID,这里省去通过账号和明码注册过程。
  • 多个客户端登录,用 userId 空格 音讯的形式单聊。

实现过程

  1. 应用工具类把 UserId 和 Channel 绑定为 Session。

    • Session 的信息蕴含用户 ID 以及名称,后续能够扩大更多的字段。
  2. 应用 SessionUtil 工具类操作 Session,通过 Session 储存以后会话信息。

    • 这里用的 ConcurrentHashMap 实现并发平安
    • ConcurrentHashMap 为 userId -> Channel 的映射 Map。
    • 用户登录的时候,须要把 Session 塞入 Map。
    • 当用户断开 Channel 连贯退出的时候,须要移除 Session 信息
  3. 服务端承受音讯并且转发(这里 Netty 相似转发手机信号的基站)

    • 获取会话信息。
    • 结构发给客户端的对象MessageResponse
    • 音讯接管方标识获取对应Channel
    • 如果指标用户登录则发送音讯,如果对方不在线,则控制台打印正告信息。

具体的代码在后面的收发音讯中有提到过,这里反复展现一遍。

MessageResponseHandler

public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) {String fromUserId = messageResponsePacket.getFromUserId();  
        String fromUserName = messageResponsePacket.getFromUserName();  
        System.out.println(fromUserId + ":" + fromUserName + "->" + messageResponsePacket  
                .getMessage());  
    }  
}

MessageRequestHandler

@ChannelHandler.Sharable  
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {public static final MessageRequestHandler INSTANCE = new MessageRequestHandler();  
  
    private MessageRequestHandler() {}  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {long begin = System.currentTimeMillis();  
  
  
        // 1. 拿到音讯发送方的会话信息  
        Session session = SessionUtil.getSession(ctx.channel());  
  
        // 2. 通过音讯发送方的会话信息结构要发送的音讯  
        MessageResponsePacket messageResponsePacket = new MessageResponsePacket();  
        messageResponsePacket.setFromUserId(session.getUserId());  
        messageResponsePacket.setFromUserName(session.getUserName());  
        messageResponsePacket.setMessage(messageRequestPacket.getMessage());  
  
        // 3. 拿到音讯接管方的 channel        Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId());  
  
        // 4. 将音讯发送给音讯接管方  
        if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) {toUserChannel.writeAndFlush(messageResponsePacket).addListener(future -> {if (future.isDone()) {}});  
        } else {System.err.println("[" + session.getUserId() + "] 不在线,发送失败!");  
  
        }  
    }  
}

群聊发动和告诉

上面两个大节围绕群聊实现介绍。整个群聊和单聊实现相似,都是通过标识获取 Channel,为了方面多个成员治理,设计 ChannelGroup 实现 Channel 的批量操作。

预期成果

  1. 三位用户顺次登录。
  2. 控制台输出 createGroup 指令,提醒创立群聊须要 userId 列表,之后以英文逗号分隔 userId。
  3. 群聊创立胜利之后,所有群聊成员收到退出胜利音讯。

创立群聊实现

次要逻辑如下:

  1. 创立一个 channel 分组。
  2. 筛选出待退出群聊的用户的 channel 和 userName。
  3. 创立群聊创立后果的响应。
  4. 给每个客户端发送拉群告诉
  5. 保留群组相干的信息。

其中存储群的相干信息利用了 ConcurrentHashMap 实现,和 Session 的会话信息存储形式相似,ChannelGroup对象负责封装多个 Channel 的信息,模仿群聊中的“群”。

  
@ChannelHandler.Sharable  
public class CreateGroupRequestHandler extends SimpleChannelInboundHandler<CreateGroupRequestPacket> {public static final CreateGroupRequestHandler INSTANCE = new CreateGroupRequestHandler();  
  
    private CreateGroupRequestHandler() {}  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, CreateGroupRequestPacket createGroupRequestPacket) {List<String> userIdList = createGroupRequestPacket.getUserIdList();  
  
        List<String> userNameList = new ArrayList<>();  
        // 1. 创立一个 channel 分组  
        ChannelGroup channelGroup = new DefaultChannelGroup(ctx.executor());  
  
        // 2. 筛选出待退出群聊的用户的 channel 和 userName        for (String userId : userIdList) {Channel channel = SessionUtil.getChannel(userId);  
            if (channel != null) {channelGroup.add(channel);  
                userNameList.add(SessionUtil.getSession(channel).getUserName());  
            }  
        }  
  
        // 3. 创立群聊创立后果的响应  
        String groupId = IDUtil.randomId();  
        CreateGroupResponsePacket createGroupResponsePacket = new CreateGroupResponsePacket();  
        createGroupResponsePacket.setSuccess(true);  
        createGroupResponsePacket.setGroupId(groupId);  
        createGroupResponsePacket.setUserNameList(userNameList);  
  
        // 4. 给每个客户端发送拉群告诉  
        channelGroup.writeAndFlush(createGroupResponsePacket);  
  
        System.out.print("群创立胜利,id 为" + createGroupResponsePacket.getGroupId() + ",");  
        System.out.println("群外面有:" + createGroupResponsePacket.getUserNameList());  
  
        // 5. 保留群组相干的信息  
        SessionUtil.bindChannelGroup(groupId, channelGroup);  
    }  
}

客户端解决局部则是简略的打印创立群聊胜利的信息,实现比较简单这里不再贴出相干代码。

群聊成员治理实现

设计流程和实现思路

设计流程

  1. 退出群聊,控制台输入创立胜利音讯。
  2. 控制台输出 joinGroup 之后输出群 ID,退出群聊,控制台显示退出群胜利。
  3. 控制台输出 listGroupMembers 而后输出群 ID,展现群成员。
  4. quitGroup 输出群 ID,进行退群
  5. 控制台输出 joinGroup 之后输出群 ID 显示对应成员不在,则退群胜利。

实现思路

  1. 在控制台中退出群退出的命令处理器。
  2. 服务端解决群聊申请。
  3. 客户端解决加群响应.
  4. 群聊退出实现。

在控制台中退出群退出的命令处理器

JoinGroupConsoleCommand

public class JoinGroupConsoleCommand implements ConsoleCommand {  
    @Override  
    public void exec(Scanner scanner, Channel channel) {JoinGroupRequestPacket joinGroupRequestPacket = new JoinGroupRequestPacket();  
  
        System.out.print("输出 groupId,退出群聊:");  
        String groupId = scanner.next();  
  
        joinGroupRequestPacket.setGroupId(groupId);  
        channel.writeAndFlush(joinGroupRequestPacket);  
    }  
}

服务端解决群聊申请

服务端解决群聊申请:

  1. 构建 Channel 分区,把处在同一个分组的 Channel 放到一个 List 当中存储
  2. 如果群聊构建胜利,则构建创立胜利响应后果。
@ChannelHandler.Sharable  
public class JoinGroupRequestHandler extends SimpleChannelInboundHandler<JoinGroupRequestPacket> {public static final JoinGroupRequestHandler INSTANCE = new JoinGroupRequestHandler();  
  
    private JoinGroupRequestHandler() {}  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, JoinGroupRequestPacket requestPacket) {  
        // 1. 获取群对应的 channelGroup,而后将以后用户的 channel 增加进去  
        String groupId = requestPacket.getGroupId();  
        ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);  
        channelGroup.add(ctx.channel());  
  
        // 2. 结构加群响应发送给客户端  
        JoinGroupResponsePacket responsePacket = new JoinGroupResponsePacket();  
  
        responsePacket.setSuccess(true);  
        responsePacket.setGroupId(groupId);  
        ctx.writeAndFlush(responsePacket);  
    }  
}

客户端解决加群响应

简略打印加群的响应音讯。

public class JoinGroupResponseHandler extends SimpleChannelInboundHandler<JoinGroupResponsePacket> {  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, JoinGroupResponsePacket responsePacket) {if (responsePacket.isSuccess()) {System.out.println("退出群 [" + responsePacket.getGroupId() + "] 胜利!");  
        } else {System.err.println("退出群 [" + responsePacket.getGroupId() + "] 失败,起因为:" + responsePacket.getReason());  
        }  
    }  
}

群聊退出实现

群聊退出次要是获取群对应的 channelGroup,而后将以后用户的 channel 移除,之后构建退群的响应信息回传客户端即可。

QuitGroupRequestHandler

@ChannelHandler.Sharable  
public class QuitGroupRequestHandler extends SimpleChannelInboundHandler<QuitGroupRequestPacket> {public static final QuitGroupRequestHandler INSTANCE = new QuitGroupRequestHandler();  
  
    private QuitGroupRequestHandler() {}  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, QuitGroupRequestPacket requestPacket) {  
        // 1. 获取群对应的 channelGroup,而后将以后用户的 channel 移除  
        String groupId = requestPacket.getGroupId();  
        ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);  
        channelGroup.remove(ctx.channel());  
  
        // 2. 结构退群响应发送给客户端  
        QuitGroupResponsePacket responsePacket = new QuitGroupResponsePacket();  
  
        responsePacket.setGroupId(requestPacket.getGroupId());  
        responsePacket.setSuccess(true);  
        ctx.writeAndFlush(responsePacket);  
  
    }  
}

心跳检测

网络问题

假死

TCP 层面来看,服务端收到 4 次握手包或者 RST 包才算真正断开连接,如果中途应用程序并没有捕捉到,此时是认为这条连贯存在的。

假死引发问题

  • 客户端发送数据超时无响应,影响体验。
  • 节约 CPU 和内存资源,性能下滑。

假死起因

  • 公网丢包,网络抖动。
  • 应用程序阻塞无奈读写。
  • 客户端或者服务端设别故障,网卡,机房故障。

为了解决下面的问题,通常会应用心跳检测机制定期检测每个 Channel 连贯是否存活。

服务端心跳检测实现

  1. 通过 IdleStateHandler 自带 Handler 实现
  2. 继承类,而后开启定时工作
  3. 触发假死该 Handler 回调channelIdle 办法

客户端预判和进攻假死

  1. 新建Handler
  2. 开启定时线程。
  3. 组装心跳包。
  4. 发送心跳。
  5. 服务端简略开发承受和辨认心跳包的 Handler,之后回送收到心跳包音讯即可。

注意事项

  • 心跳检测 Handler 插入到整个 Pipeline 最后面,因为如果连贯理论曾经断开后续的所有解决均无意义。
  • 假死不肯定“死”,避免服务端误判,客户端也须要措施避免假死和预判假死,这就是客户端预判的含意。

思考

  1. IdleHandler 可否单例?
  2. 断开链接之后从新连贯登录

IdleHandler 可否单例?

答案是 不能。因为它并不是无状态的,并且每个 Channel 都有各自的连贯状态。

断开链接之后从新连贯登录

通过额定的线程定时轮循所有的连贯的活跃性,如果发现其中有死连贯,则执行重连。

写在最初

相熟聊天零碎对于后续的源码剖析非常有意义,我的项目的整体构建比较简单,集体在笔记中将重点局部做了一个梳理。

文章参考

https://juejin.cn/book/m/6844733738119593991/section/6844733738291576840?suid=2040300414187416

正文完
 0