共计 8079 个字符,预计需要花费 21 分钟才能阅读完成。
前 言
三此君看了好几本书,看了很多遍源码整顿的 一张图进阶 RocketMQ 图片,对于 RocketMQ 你只须要记住这张图!感觉不错的话,记得点赞关注哦。
【重要】视频在 B 站同步更新,欢送围观,轻轻松松涨姿态。一张图进阶 RocketMQ- 通信机制(视频版)
https://www.bilibili.com/vide…
本文是“一张图进阶 RocketMQ”第 4 篇,对 RocketMQ 不理解的同学能够先看看后面三期:
- 一张图进阶 RocketMQ- 整体架构
- 一张图进阶 RocketMQ – NameServer
- 一张图进阶 RocketMQ – 音讯发送
上一期分享了 RocketMQ 生产者启动流程及同步音讯发送流程,咱们晓得了在通信层是基于 Netty 将消息传递给 Broker 进行存储的。如果对 Netty 齐全不理解咱们就很难真正了解 RocketMQ,所以明天咱们简略的聊一聊 Netty 根本流程,而后剖析 RocketMQ 的通信机制,最初通过异步音讯发送来串联 RocketMQ 通信机制。
Netty 介绍
Netty 有很多概念,等介绍完概念大家都困了,咱们就不过多介绍了,间接联合示例来看看 Netty 的根底流程,可能帮忙咱们更好的了解 RocketMQ 即可。
- Netty 服务端启动初始化两个线程组 BossGroup & WorkerGroup,别离用于解决 客户端连贯及网络读写。
- Netty 客户端启动初始化一个线程组,用于解决申请及返回后果。
- 客户端 connect 到 Netty 服务端,创立用于 传输数据的 Channel。
- Netty 服务端的 BossGroup 解决客户端的连贯申请,而后把剩下的工作交给 WorkerGroup。
- 连贯建设好了,客户端就能够利用这个连贯发送数据给 Netty 服务端。
- Netty WorkerGroup 中的线程应用 Pipeline(蕴含多个处理器 Handler) 对数据进行解决。
- Netty 服务端的解决完申请后,返回后果也通过 Pipeline 解决。
- Netty 服务端通过 Channel 将数据返回给客户端。
-
客户端通过 Channel 接管到数据,也通过 Pipeline 进行解决。
Netty 示例
咱们先用 Netty 实现一个简略的 服务端 / 客户端 通信示例,咱们是这样应用的,那 RocketMQ 基于 Netty 的通信也应该是这样应用的,不过是在这个根底上封装了一层。次要关注以下几个点:服务端什么时候初始化的,服务端实现的 Handler 做了什么事?客户端什么时候初始化的,客户端实现的 Handler 做了什么事?
Netty 服务端初始化:初始化的代码很要害,咱们要从源码上了解 RocketMQ 的通信机制,那必定会看到相似的代码。依据下面的流程来看,首先是实例化 bossGroup 和 workerGroup,而后初始化 Channel,从代码能够看出咱们是在 Pipeline 中增加了本人实现的 Handler,这个 Handler 就是业务本人的逻辑了,那 RocketMQ 要解决数据应该也须要实现相应的 Handler。public class MyServer {public static void main(String[] args) throws Exception { // 创立两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创立服务端的启动对象,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 boosGroup 和 workerGroup bootstrap.group(bossGroup, workerGroup) // 设置服务端通道实现类型 .channel(NioServerSocketChannel.class) // 应用匿名外部类的模式初始化 Channel 对象 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 给 pipeline 管道增加处理器 socketChannel.pipeline().addLast(new MyServerHandler()); } });// 给 workerGroup 的 EventLoop 对应的管道设置处理器 // 绑定端口号,启动服务端 ChannelFuture channelFuture = bootstrap.bind(6666).sync(); // 对敞开通道进行监听 channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();} } }
实现自定义的服务端处理器 Handler:自定义的 Handler 须要实现 Netty 定义的 HandlerAdapter,当有可读事件时就会调用这里的 channelRead() 办法。等下咱们看 RocketMQ 通信机制的时候注意 RocketMQ 自定义了哪些 Handler,这些 Handler 有做了什么事。
/** * 自定义的 Handler 须要继承 Netty 规定好的 HandlerAdapter 能力被 Netty 框架所关联,有点相似 SpringMVC 的适配器模式 **/ public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 获取客户端发送过去的音讯 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8)); // 发送音讯给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,记得关注三此君,记得三连", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 产生异样,敞开通道 ctx.close();} }
Netty 客户端初始化:Netty 客户端,在 RocketMQ 中对应了 Producer/Consumer。在 Producer 启动中有一步是启动通信模块服务,其实就是初始化 Netty 客户端。客户端也须要先实例化一个 NioEventLoopGroup,而后将自定义的 handler 增加到 Pipeline,还有很重要的一步是咱们须要 connect 连贯到 Netty 服务端。
public class MyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { // 创立 bootstrap 启动疏导对象,配置参数 Bootstrap bootstrap = new Bootstrap(); // 设置线程组 bootstrap.group(eventExecutors) // 设置客户端的 Channel 实现类型 .channel(NioSocketChannel.class) // 应用匿名外部类初始化 Pipeline .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 增加客户端 Channel 的处理器 ch.pipeline().addLast(new MyClientHandler()); } }) //connect 连贯服务端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); // 对 Channel 敞开进行监听 channelFuture.channel().closeFuture().sync();} finally { // 敞开线程组 eventExecutors.shutdownGracefully();} } }
实现自定义的客户端处理器 Handler:客户端处理器也继承自 Netty 定义的 HandlerAdapter,当 Channel 变得可读的时候(服务端数据返回)会调用咱们本人实现的 channelRead()。
public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 发送音讯到服务端 ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送音讯~", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接管服务端发送过去的音讯 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到三此君的音讯,我肯定会三连的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8)); } }
RocketMQ 通信流程
RocketMQ 通信模块基于 Netty 实现,总体代码量不多。次要是 NettyRemotingServer 和 NettyRemotingClient,别离对应通信的服务端和客户端。依据后面的 Netty 示例,咱们要了解 RocketMQ 如何基于 Netty 通信,只须要晓得 4 个中央:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基于 NettyRemotingClient 发送音讯,无论是客户端还是服务端收到数据后都须要 Handler 来解决。
- Broker/NameServer 须要启动 Netty 服务端。Broker 咱们前面会进一步剖析,只须要晓得 Broker 启动的时候会调用 NettyRemotingServer.start() 办法初始化 Netty 服务器。次要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组,配置 Channel,增加 NettyServerHandler,调用 serverBootstrap.bind() 监听端口期待客户端连贯。
- Producer/Consumer 须要启动 Netty 客户端,在生产者启动流程中 MQClientInstantce 启动通信服务模块,其实就是调用 NettyRemotingClient.start() 初始化 Netty 客户端。次要做了 3 件事:配置客户端 NioEventLoopGroup 线程组,配置 Channel,增加 NettyClientHandler。
- 客户端配置了 Channel,然而 Channel 还没有创立,因为 Channel 必定要和具体的 Server IP Addr 关联。在同步音讯发送流程中,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创立一个新的 Channel,其实就是调用 bootstrap.connect() 连贯到 NettyServer,创立用于通信的 Channel。
- 有了 Channel 后,Producer 调用 Channel.writeAndFlush() 将数据发送给服务器。NettyRemotingServer WorkerGroup 解决可读事件,调用 NettyServerHandler 解决数据。
- NettyServerHandler 调用 processMessageReceived 办法。processMessageReceived 办法做了什么呢?通过传入的申请码 RequestCode 区别不同的申请,不同的申请定义了不同的 Processor。例如,是生产者存入音讯应用 SendMessageProcessor,查问音讯应用 QueryMessageProcessor,拉取音讯应用 PullMessageProcessor。这些 Processor 在服务端初始化的时候,以 RequestCode 为 Key 增加到 Processor 缓存中。processMessageReceived 就是依据 RequeseCode 获取不同的 Processor,解决完后把后果返回给 NettyRemotingClient。
-
NettyRemotingClient 收到可读事件,调用 NettyClientHandler 解决返回后果。NettyClientHandler 也调用 processMessageReceived 解决返回后果。processMessageReceived 从以 opaque 为 key ResponseTables 缓存冲取出 ResponseFuture,将返回后果设置到 ResponseFuture。同步音讯则执行 responseFuture.putResponse(),异步调用执行回调。
异步发送
除了同步音讯发送,RocketMQ 还反对异步发送。咱们只须要在后面是示例中稍作批改就会失去一个异步发送示例,最大的不同在于发送的时候传入 SendCallback 接管异步返回后果回调。
public class AsyncProducer {public static void main(String[] args) throws Exception { // 实例化音讯生产者 Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置 NameServer 的地址 producer.setNamesrvAddr("localhost:9876"); // 启动 Producer 实例 producer.start(); // 创立音讯,并指定 Topic,Tag 和音讯体 Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8")); // SendCallback 接管异步返回后果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) {System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId()); } @Override public void onException(Throwable e) {System.out.printf("三连呀!!!%-10d Exception %s %n", index, e); e.printStackTrace();} }); // 如果不再发送音讯,敞开 Producer 实例。producer.shutdown();} }
同步发送个异步发送次要的过程都是一样的,不同点在于同步音讯调用 Netty Channel.writeAndFlush 之后是 waitResponse 期待 Broker 返回,而异步音讯是调用事后定义好的回调函数。
异步音讯和同步音讯整体差不多,能够说在基于 Netty 实现异步音讯比同步音讯还要简略一下,咱们这里次要来看一些不同点:
- 调用 DefaultMQProducer 异步发送接口须要咱们定义 SendCallback 回调函数,在执行胜利或者执行失败后回调。
- DefaultMQProducerImp 中的 send 办法会将异步发送申请封装成 Runable 提交到线程池,而后业务线程就间接返回了。
- sendDefaultImpl 计算重试同步和异步音讯有区别,异步音讯在这里不会重试,而是在前面后果返回的时候通过递归重试。
- 跟着调用链到 sendMessageAsync 办法,须要留神的是这里构建了 InvokeCallback 实例,ResponseFuture 会持有该实例,Netty 后果返回后调用该实例的办法。
- 上面就是失常的 Netty 数据发送流程,直到 Broker 解决完申请,返回后果。NettyRemotingClient 解决可读事件,NettyClientHandler 解决返回后果,调用 ResponseFuture.executeInokeCallback,进而调用 InvokeCallback.operationComplete.
- 如果 Broker 返回后果是胜利的,则封装返回后果 SendResult,并回调业务实现的 SendCallback.onSucess 办法,更新容错项。
-
如果 Broker 返回失败,或呈现任何异样则执行重试,重试超过 retryTimesWhenSendFailed 次则回调业务定义的 SendCallback.onException 办法。
总结
以上就是 RocketMQ 音讯发送的次要内容,咱们简略的总结下:
- Netty:BossGroup 解决客户端连贯申请,生成 ServerSocketChannel 注册到 WorkerGroup,WorkerGroup 解决网络读写申请,调用 Channel 对应的 Pipeline 解决申请,Pipeline 中有很多 ChannelHandler 对申请进行解决。
- 通信机制:基于 Netty 实现,只须要注意 NettyRemotingServer/NettyRemotingClient 的初始化,并且在通道变得可读 / 可写时,会调用 NettyServerHandler/NettyClienthandler 进行解决。
- 同步异步:同步和异步音讯大同小异,只是同步音讯通过 Netty 发送申请后会执行 ResponseFuture.waitResponse() 阻塞期待,而异步音讯发送申请后不会期待,申请返回回调用 SendCallback 相应的办法。
以上就是明天全副的内容,如果感觉本期的内容对你有用的话记得点赞、关注、转发珍藏,这将是对我最大的反对。如果你须要 RocketMQ 相干的所有材料,能够评论区留言,或者关注三此君的公众号,回复 mq 即可。
音讯曾经发送给了 Broker,下一期咱们未来看看 Broker 是如何存储音讯的,RocketMQ 如何反对百万级的吞吐量?感激观看,咱们下期再见
参考文献
- RocketMQ 官网文档
- RocketMQ 源码
- 丁威, 周继锋. RocketMQ 技术底细:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
- 李伟. RocketMQ 分布式消息中间件:外围原理与最佳实际. 电子工业出版社, 2020-08.
- 杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.