关于rocketmq:一张图进阶-RocketMQ-通信机制

4次阅读

共计 8079 个字符,预计需要花费 21 分钟才能阅读完成。

前 言

三此君看了好几本书,看了很多遍源码整顿的 一张图进阶 RocketMQ 图片,对于 RocketMQ 你只须要记住这张图!感觉不错的话,记得点赞关注哦。

【重要】视频在 B 站同步更新,欢送围观,轻轻松松涨姿态。一张图进阶 RocketMQ- 通信机制(视频版)

https://www.bilibili.com/vide…

本文是“一张图进阶 RocketMQ”第 4 篇,对 RocketMQ 不理解的同学能够先看看后面三期:

  1. 一张图进阶 RocketMQ- 整体架构
  2. 一张图进阶 RocketMQ – NameServer
  3. 一张图进阶 RocketMQ – 音讯发送

上一期分享了 RocketMQ 生产者启动流程及同步音讯发送流程,咱们晓得了在通信层是基于 Netty 将消息传递给 Broker 进行存储的。如果对 Netty 齐全不理解咱们就很难真正了解 RocketMQ,所以明天咱们简略的聊一聊 Netty 根本流程,而后剖析 RocketMQ 的通信机制,最初通过异步音讯发送来串联 RocketMQ 通信机制。

Netty 介绍

Netty 有很多概念,等介绍完概念大家都困了,咱们就不过多介绍了,间接联合示例来看看 Netty 的根底流程,可能帮忙咱们更好的了解 RocketMQ 即可。

  1. Netty 服务端启动初始化两个线程组 BossGroup & WorkerGroup,别离用于解决 客户端连贯及网络读写
  2. Netty 客户端启动初始化一个线程组,用于解决申请及返回后果。
  3. 客户端 connect 到 Netty 服务端,创立用于 传输数据的 Channel
  4. Netty 服务端的 BossGroup 解决客户端的连贯申请,而后把剩下的工作交给 WorkerGroup。
  5. 连贯建设好了,客户端就能够利用这个连贯发送数据给 Netty 服务端。
  6. Netty WorkerGroup 中的线程应用 Pipeline(蕴含多个处理器 Handler) 对数据进行解决。
  7. Netty 服务端的解决完申请后,返回后果也通过 Pipeline 解决。
  8. Netty 服务端通过 Channel 将数据返回给客户端。
  9. 客户端通过 Channel 接管到数据,也通过 Pipeline 进行解决。

    Netty 示例

    咱们先用 Netty 实现一个简略的 服务端 / 客户端 通信示例,咱们是这样应用的,那 RocketMQ 基于 Netty 的通信也应该是这样应用的,不过是在这个根底上封装了一层。次要关注以下几个点:服务端什么时候初始化的,服务端实现的 Handler 做了什么事?客户端什么时候初始化的,客户端实现的 Handler 做了什么事?
    Netty 服务端初始化:初始化的代码很要害,咱们要从源码上了解 RocketMQ 的通信机制,那必定会看到相似的代码。依据下面的流程来看,首先是实例化 bossGroup 和 workerGroup,而后初始化 Channel,从代码能够看出咱们是在 Pipeline 中增加了本人实现的 Handler,这个 Handler 就是业务本人的逻辑了,那 RocketMQ 要解决数据应该也须要实现相应的 Handler。

    public class MyServer {public static void main(String[] args) throws Exception {
         // 创立两个线程组 boosGroup、workerGroup
         EventLoopGroup bossGroup = new NioEventLoopGroup();
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         try {
             // 创立服务端的启动对象,设置参数
             ServerBootstrap bootstrap = new ServerBootstrap();
             // 设置两个线程组 boosGroup 和 workerGroup
             bootstrap.group(bossGroup, workerGroup)
                 // 设置服务端通道实现类型    
                 .channel(NioServerSocketChannel.class)
                 // 应用匿名外部类的模式初始化 Channel 对象    
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel socketChannel) throws Exception {
                             // 给 pipeline 管道增加处理器
                             socketChannel.pipeline().addLast(new MyServerHandler());
                         }
                     });// 给 workerGroup 的 EventLoop 对应的管道设置处理器
             // 绑定端口号,启动服务端
             ChannelFuture channelFuture = bootstrap.bind(6666).sync();
             // 对敞开通道进行监听
             channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();
             workerGroup.shutdownGracefully();}
     }
    }

    实现自定义的服务端处理器 Handler:自定义的 Handler 须要实现 Netty 定义的 HandlerAdapter,当有可读事件时就会调用这里的 channelRead() 办法。等下咱们看 RocketMQ 通信机制的时候注意 RocketMQ 自定义了哪些 Handler,这些 Handler 有做了什么事。

    /**
     * 自定义的 Handler 须要继承 Netty 规定好的 HandlerAdapter 能力被 Netty 框架所关联,有点相似 SpringMVC 的适配器模式
     **/
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         // 获取客户端发送过去的音讯
         ByteBuf byteBuf = (ByteBuf) msg;
         System.out.println("收到" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
         // 发送音讯给客户端
         ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,记得关注三此君,记得三连", CharsetUtil.UTF_8));
     }
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         // 产生异样,敞开通道
         ctx.close();}
    }

    Netty 客户端初始化:Netty 客户端,在 RocketMQ 中对应了 Producer/Consumer。在 Producer 启动中有一步是启动通信模块服务,其实就是初始化 Netty 客户端。客户端也须要先实例化一个 NioEventLoopGroup,而后将自定义的 handler 增加到 Pipeline,还有很重要的一步是咱们须要 connect 连贯到 Netty 服务端。

    public class MyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
         try {
             // 创立 bootstrap 启动疏导对象,配置参数
             Bootstrap bootstrap = new Bootstrap();
             // 设置线程组
             bootstrap.group(eventExecutors)
                 // 设置客户端的 Channel 实现类型    
                 .channel(NioSocketChannel.class)
                 // 应用匿名外部类初始化 Pipeline
                 .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             // 增加客户端 Channel 的处理器
                             ch.pipeline().addLast(new MyClientHandler());
                         }
                     })
             //connect 连贯服务端
             ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
             // 对 Channel 敞开进行监听
             channelFuture.channel().closeFuture().sync();} finally {
             // 敞开线程组
             eventExecutors.shutdownGracefully();}
     }
    }

    实现自定义的客户端处理器 Handler:客户端处理器也继承自 Netty 定义的 HandlerAdapter,当 Channel 变得可读的时候(服务端数据返回)会调用咱们本人实现的 channelRead()。

    public class MyClientHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         // 发送音讯到服务端
         ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送音讯~", CharsetUtil.UTF_8));
     }
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         // 接管服务端发送过去的音讯
         ByteBuf byteBuf = (ByteBuf) msg;
         System.out.println("收到三此君的音讯,我肯定会三连的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
     }
    }

    RocketMQ 通信流程

    RocketMQ 通信模块基于 Netty 实现,总体代码量不多。次要是 NettyRemotingServer 和 NettyRemotingClient,别离对应通信的服务端和客户端。依据后面的 Netty 示例,咱们要了解 RocketMQ 如何基于 Netty 通信,只须要晓得 4 个中央:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基于 NettyRemotingClient 发送音讯,无论是客户端还是服务端收到数据后都须要 Handler 来解决。

  • Broker/NameServer 须要启动 Netty 服务端。Broker 咱们前面会进一步剖析,只须要晓得 Broker 启动的时候会调用 NettyRemotingServer.start() 办法初始化 Netty 服务器。次要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组,配置 Channel,增加 NettyServerHandler,调用 serverBootstrap.bind() 监听端口期待客户端连贯。
  • Producer/Consumer 须要启动 Netty 客户端,在生产者启动流程中 MQClientInstantce 启动通信服务模块,其实就是调用 NettyRemotingClient.start() 初始化 Netty 客户端。次要做了 3 件事:配置客户端 NioEventLoopGroup 线程组,配置 Channel,增加 NettyClientHandler。
  • 客户端配置了 Channel,然而 Channel 还没有创立,因为 Channel 必定要和具体的 Server IP Addr 关联。在同步音讯发送流程中,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创立一个新的 Channel,其实就是调用 bootstrap.connect() 连贯到 NettyServer,创立用于通信的 Channel。
  • 有了 Channel 后,Producer 调用 Channel.writeAndFlush() 将数据发送给服务器。NettyRemotingServer WorkerGroup 解决可读事件,调用 NettyServerHandler 解决数据。
  • NettyServerHandler 调用 processMessageReceived 办法。processMessageReceived 办法做了什么呢?通过传入的申请码 RequestCode 区别不同的申请,不同的申请定义了不同的 Processor。例如,是生产者存入音讯应用 SendMessageProcessor,查问音讯应用 QueryMessageProcessor,拉取音讯应用 PullMessageProcessor。这些 Processor 在服务端初始化的时候,以 RequestCode 为 Key 增加到 Processor 缓存中。processMessageReceived 就是依据 RequeseCode 获取不同的 Processor,解决完后把后果返回给 NettyRemotingClient。
  • NettyRemotingClient 收到可读事件,调用 NettyClientHandler 解决返回后果。NettyClientHandler 也调用 processMessageReceived 解决返回后果。processMessageReceived 从以 opaque 为 key ResponseTables 缓存冲取出 ResponseFuture,将返回后果设置到 ResponseFuture。同步音讯则执行 responseFuture.putResponse(),异步调用执行回调。

    异步发送

    除了同步音讯发送,RocketMQ 还反对异步发送。咱们只须要在后面是示例中稍作批改就会失去一个异步发送示例,最大的不同在于发送的时候传入 SendCallback 接管异步返回后果回调。

    public class AsyncProducer {public static void main(String[] args) throws Exception {
          // 实例化音讯生产者 Producer
          DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          // 设置 NameServer 的地址
          producer.setNamesrvAddr("localhost:9876");
          // 启动 Producer 实例
          producer.start();
          // 创立音讯,并指定 Topic,Tag 和音讯体
          Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8")); 
          // SendCallback 接管异步返回后果的回调
          producer.send(msg, new SendCallback() {
              @Override
              public void onSuccess(SendResult sendResult) {System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
              }
              @Override
              public void onException(Throwable e) {System.out.printf("三连呀!!!%-10d Exception %s %n", index, e);
                  e.printStackTrace();}
          });
          // 如果不再发送音讯,敞开 Producer 实例。producer.shutdown();}
    }

    同步发送个异步发送次要的过程都是一样的,不同点在于同步音讯调用 Netty Channel.writeAndFlush 之后是 waitResponse 期待 Broker 返回,而异步音讯是调用事后定义好的回调函数。

    异步音讯和同步音讯整体差不多,能够说在基于 Netty 实现异步音讯比同步音讯还要简略一下,咱们这里次要来看一些不同点:

  • 调用 DefaultMQProducer 异步发送接口须要咱们定义 SendCallback 回调函数,在执行胜利或者执行失败后回调。
  • DefaultMQProducerImp 中的 send 办法会将异步发送申请封装成 Runable 提交到线程池,而后业务线程就间接返回了。
  • sendDefaultImpl 计算重试同步和异步音讯有区别,异步音讯在这里不会重试,而是在前面后果返回的时候通过递归重试。
  • 跟着调用链到 sendMessageAsync 办法,须要留神的是这里构建了 InvokeCallback 实例,ResponseFuture 会持有该实例,Netty 后果返回后调用该实例的办法。
  • 上面就是失常的 Netty 数据发送流程,直到 Broker 解决完申请,返回后果。NettyRemotingClient 解决可读事件,NettyClientHandler 解决返回后果,调用 ResponseFuture.executeInokeCallback,进而调用 InvokeCallback.operationComplete.
  • 如果 Broker 返回后果是胜利的,则封装返回后果 SendResult,并回调业务实现的 SendCallback.onSucess 办法,更新容错项。
  • 如果 Broker 返回失败,或呈现任何异样则执行重试,重试超过 retryTimesWhenSendFailed 次则回调业务定义的 SendCallback.onException 办法。

    总结

    以上就是 RocketMQ 音讯发送的次要内容,咱们简略的总结下:

  • Netty:BossGroup 解决客户端连贯申请,生成 ServerSocketChannel 注册到 WorkerGroup,WorkerGroup 解决网络读写申请,调用 Channel 对应的 Pipeline 解决申请,Pipeline 中有很多 ChannelHandler 对申请进行解决。
  • 通信机制:基于 Netty 实现,只须要注意 NettyRemotingServer/NettyRemotingClient 的初始化,并且在通道变得可读 / 可写时,会调用 NettyServerHandler/NettyClienthandler 进行解决。
  • 同步异步:同步和异步音讯大同小异,只是同步音讯通过 Netty 发送申请后会执行 ResponseFuture.waitResponse() 阻塞期待,而异步音讯发送申请后不会期待,申请返回回调用 SendCallback 相应的办法。

以上就是明天全副的内容,如果感觉本期的内容对你有用的话记得点赞、关注、转发珍藏,这将是对我最大的反对。如果你须要 RocketMQ 相干的所有材料,能够评论区留言,或者关注三此君的公众号,回复 mq 即可。
音讯曾经发送给了 Broker,下一期咱们未来看看 Broker 是如何存储音讯的,RocketMQ 如何反对百万级的吞吐量?感激观看,咱们下期再见

参考文献

  • RocketMQ 官网文档
  • RocketMQ 源码
  • 丁威, 周继锋. RocketMQ 技术底细:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
  • 李伟. RocketMQ 分布式消息中间件:外围原理与最佳实际. 电子工业出版社, 2020-08.
  • 杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.
正文完
 0