乐趣区

关于java:一万字带你吃透RocketMQ

前言

工作中很多种场景下会用到音讯队列,音讯队列简略来说就是 音讯的传输过程中保留音讯的容器。音讯队列次要解决了利用耦合、异步解决、流量削峰等问题。明天咱们来理解一下阿里开源的一款产品 RocketMQ

RocketMQ 简介

RocketMQ 是一款低提早、高并发、高可用、高牢靠的分布式消息中间件。具备异步通信的劣势,零碎拓扑简略、上下游耦合较弱,次要利用于异步解耦,流量削峰填谷等场景。

NameServer

NameServer 是整个 RocketMQ 的“大脑”,是 RocketMQ 的 路由核心 。NameServer 的次要作用是 为音讯生产者和音讯消费者提供无关 Topic 的路由信息,所以 NameServer 就须要存储路由信息,并且可能治理 Broker 节点,包含路由注册、路由删除等性能。

路由核心高可用

“大脑”一旦故障,那可不是闹着玩的,那么必然要有对策来解决。NameServer 的 高可用能够通过部署多台 NameServer 服务器来实现,但彼此之间互不通信。尽管 NameServer 服务器之间在某一时刻的数据并不会完全相同,但对音讯发送不会在成重大影响,无非就是短暂造成音讯发送不平衡(是不是有很相熟的滋味呢?没错 CAP 实践,这不就是 AP 嘛)。RocketMQ 在 NameServer 这个模块的设计上抉择了 AP。

元数据存储

既然是路由核心,那么路由信息是如何存储的呢?咱们来看一下 RouteInfoManager 这个类。

public class RouteInfoManager {
    // Topic 音讯队列的路由信息
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // Broker 的根底信息
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // Broker 的集群信息
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // Broker 的状态信息,NameServer 每次收到心跳包时会替换该信息
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // Broker 对应的 FilterServer 列表,用于类模式音讯过滤。类模式过滤机制在 4.4 及当前版本被废除
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

NameServer 存储的信息,就在 RouteInfoManager 这个类里。

路由注册

RockerMQ 路由注册是通过 Broker 与 NameServer 的心跳性能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30s 向集群中所有的 NameServer 发送心跳包,NameServer 收到心跳包会先更新 RouteInfoManager 类中 brokerLiveTable 中 BrokerLiveInfo 的 lastUpdateTimestamp,而后每隔 10s 扫描一次 brokerLiveTable,如果间断 120s 没有收到心跳包,NameServer 将移除该 Broker 的路由信息,同时敞开 Socket 连贯。

路由删除

上边提到了 NameServer 如果间断 120s 没有收到 Broker 的心跳包,将移除该 Broker 的路由信息。还有一点就是 Broker 在 失常敞开的状况下,会执行 unregisterBroker 命令

路由发现

RockerMQ 路由发现是非实时的,当 Topic 路由呈现变动后,NameServer 不会被动推送给客户端,而是由客户端定时拉取主题最新的路由

NameServer 架构设计

Broker

上文屡次提到了 Broker,Broker 是 RocketMQ 的一个外围组件,大部分重量级工作都是通过 Broker 来实现的。Borker 解决各种申请和存储音讯,决定整个 RocketMQ 体系的吞吐性能、可靠性和可用性

CommitLog 文件

RocketMQ 在音讯写入的过程中谋求极致的磁盘程序写,所有主题的音讯全副写入一个文件,这个文件就是 CommitLog 文件。所有音讯依照到达程序顺次写入 CommitLog 文件,音讯一旦写入不反对批改。

写入的每条都会引入一个身份标记,就是 音讯物理偏移量(音讯存储在文件的起始地位)。CommitLog 文件的命名形式极具技巧性,应用存储在该文件的第一条音讯在整个 CommitLog 文件组中的偏移量来命名。这样做的益处是给出任意一个音讯的物理偏移量,能够通过二分法进行查找,疾速定位到这个文件的地位,而后利用音讯物理偏移量减去所在文件的名称,失去的差值就是在该文件中的相对地址。

ConsumeQueue 文件

所有主题的音讯都写入了 CommitLog 文件,依据主题从 CommitLog 文件中检索音讯这并不是一个好主见,为了解决基于 Topic 的音讯检索问题,RocketMQ 引入了 ConsumeQueue 文件。简略地说,ConsumeQueue 文件就是 CommitLog 文件基于 Topic 的索引文件。

ConsumeQueue 每个条目长度固定(8 字节 CommitLog 物理偏移量、4 字节音讯长度、8 字节 Tag 哈希码),固定长度的益处是能够应用拜访相似数组下标的形式疾速定位条目,极大的进步了 ConsumeQueue 文件的读取性能。

Index 文件

ConsumeQueue 文件解决了基于 Topic 查找音讯的问题,如果想基于音讯的某一个属性进行查找,那就须要 Index 文件 退场了。

Index 文件基于物理磁盘文件实现哈希索引。Index 文件由 40 字节的文件头、500 万个哈希槽、2000 万个 Index 条目组成,每隔哈希槽 4 个字节,每个 Index 条目含有 20 个字节(4 字节索引 Key 的哈希码、8 字节物理偏移量、4 字节工夫戳、4 字节的前一个 Index 条目)。

内存映射

尽管程序写大大提高了 I/O 效率,然而 基于文件的存储采纳惯例的 Java 文件操作 API,性能晋升将会很无限,所以 RockerMQ 引入了 内存映射 。将磁盘文件映射到内存中,以操作内存的形式操作磁盘(在 Linux 服务器中应用的就是操作系统的 页缓存),性能又失去了晋升。

刷盘策略

引入了内存映射和页缓存机制,使 RocketMQ 的写入性能失去了极大的保障,然而又引出了一个问题,Broker 收到客户端发送的音讯后,是存储到页缓存中就返回胜利,还是要长久化到磁盘才算胜利呢?RocketMQ 提供了 同步刷盘 异步刷盘

  • 同步刷盘:同步刷盘即长久化胜利后才向客户端返回胜利。以就义写入性能为代价。
  • 异步刷盘:异步刷盘是指 Broker 将音讯存储到页缓存后就立刻返回胜利,而后开启一个异步线程定时将内存中的数据写入磁盘,默认间隔时间 500ms。

音讯写入页缓存,音讯生产时从页缓存中读取,高并发时压力还是比拟大,为了升高页缓存的应用压力,RocketMQ 引入了 transientStorePoolEnable 机制,即内存级别的读写拆散机制

内存级别的读写拆散机制 :RocketMQ 通过 transientStorePoolEnable 机制,将音讯先写入堆外内存并立刻返回,而后异步将堆外内存的数据提交到页缓存,再异步刷盘长久化。音讯生产时还是从页缓存中读取,就造成了内存级别的读写拆散。该机制的 毛病是如果 Broker 异样退出堆外内存的数据会失落

Broker 高可用

为了进步音讯生产的高可用,防止 Broker 产生单点故障,使得存储在 Broker 上的音讯无奈及时生产,RocketMQ 引入了 Broker 的主从同步机制。即音讯达到主服务器后,须要将音讯同步到音讯从服务器,如果主服务器 Broker 宕机,音讯消费者能够从从服务器拉取音讯。

主题(Topic)

RocketMQ 中音讯传输和存储的顶层容器,用于标识同一类业务逻辑的音讯。主题通过 TopicName 来做惟一标识和辨别。

标签(Tag)

标签是 RocketMQ 提供的细粒度音讯分类属性,能够在主题层级之下做音讯类型的细分。消费者能够通过订阅特定的标签来实现细粒度过滤。

生产者(Producer)

生产者是 RocketMQ 零碎中用来构建并传输音讯到服务端的运行实体。生产者通常被集成在业务零碎中,将业务音讯依照要求封装成音讯并发送至服务端。

音讯发送

RocketMQ 反对同步、异步和单向三种音讯发送形式。

  • 同步(sync):发送者向 RocketMQ 执行发消息 API 时,同步期待,直到音讯服务器返回发送后果。
  • 异步(async):发送者向 RocketMQ 执行发消息 API 时,指定音讯发送胜利后的回调函数,调用音讯发送 API 后立刻返回,音讯发送者线程不阻塞,直到运行完结,音讯发送胜利或失败的回调工作在一个新的线程中执行。
  • 单向(one way):发送者向 RocketMQ 执行发消息 API 时,间接返回,不期待音讯服务器的后果也不注册回调函数。

音讯发送高可用

为了实现音讯发送的高可用,RocketMQ 有两个十分重要的个性。

  • 音讯发送重试机制:在音讯发送时如果呈现失败,默认会重试两次。
  • 故障躲避机制:当音讯第一次发送失败时,如果下一次音讯还是发送到刚刚失败的 Broker 上,大概率还是会失败,为了保障重试的可靠性,在重试时会尽量避开刚刚接管失败的 Broker,而是抉择其余 Broker 上的队列进行发送。

RocketMQ 默认应用轮询算法进行路由的负载平衡 。在音讯发送时反对自定义的负载平衡算法,须要特地留神的是, 应用自定义的路由负载算法后 RocketMQ 的重试机制将生效

SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

这是 MQProducer 中一个自定义队列抉择办法,参数 MessageQueueSelector 音讯队列选择器可抉择自定义的音讯发送到的队列。

音讯发送流程

消费者(Consumer)

消费者是 RocketMQ 中用来接管并解决音讯的运行实体。消费者通常被集成在业务零碎中,从服务端获取音讯,并将音讯转化成业务可了解的信息,供业务逻辑解决。

消费者分组(ConsumerGroup)

音讯生产以组的模式开展,消费者分组是 RocketMQ 零碎中承载多个消费行为统一的消费者的负载平衡分组。在 RocketMQ 中,通过 消费者分组内初始化多个消费者实现生产性能的程度扩大以及高可用容灾

一个消费者能够蕴含多个消费者,每个消费者组能够订阅多个主题

生产模式

RocketMQ 消费者组之间有集群模式和播送模式两种生产模式。

  • 集群模式:以后主题下的同一条音讯只容许被其中一个消费者生产。
  • 播送模式:以后主题下的同一条音讯将被集群内的所有消费者生产一次。

集群模式下,多个消费者的话则须要对音讯队列进行负载,负载机制遵循一个通用的思维:一个音讯队列同一时间只容许被一个消费者生产,一个消费者能够生产多个音讯队列

音讯传送

RocketMQ 音讯服务器和消费者之间的消息传递有两种形式:推模式和拉模式。

  • 推模式:推模式是音讯达到音讯服务器后,再推送给音讯消费者。
  • 拉模式:是生产端被动发动拉取音讯的申请。

RocketMQ 音讯推模式基于拉模式,在拉模式上包装一层,一个拉取工作实现后开始下一个拉取工作。就是说 RocketMQ 并没有真正实现推模式,而是消费者被动向音讯服务器拉取音讯。

如果音讯消费者向音讯服务器发送拉取申请,音讯并未达到音讯队列,且未启用长轮询机制的话,则会在服务端期待一段时间后(挂起),再去判断音讯是否已达到音讯队列。如果音讯未达到,则提醒音讯拉取客户端 PULL_NOT_FOUND(音讯不存在),如果开启长轮询模式,RocketMQ 一方面会每隔 5s 轮询查看一次音讯是否可达,同时一有新音讯达到后,立刻告诉挂起线程再次验证新音讯是否是本人感兴趣的,如果是则从 CommitLog 文件提取音讯返回给音讯拉取客户端,否则挂起超时,超时工夫由音讯拉取方在音讯拉取时封装在申请与参数中,推模式默认 15s。

生产形式

RocketMQ 反对并发生产与程序生产两种生产形式。

  • 并发生产:多个消费者同时生产同一批音讯以进步处理速度,不思考音讯的先后顺序。
  • 程序生产:指同一时刻,一个队列只有一个消费者线程在生产。会在 Broker 端锁队列。

RocketMQ 反对 部分程序生产 ,队列存在人造的程序,也就是 保障同一个音讯队列上的音讯按程序生产 。上边讲生产者发送音讯的时候,能够自定义队列抉择,消费者生产时,就能够通过队列的个性来程序生产了。如果要实现某一个主题的 全局程序生产,能够将该主题的队列数设置为 1,留神这样将就义高可用性。

并发生产和程序生产的实现逻辑在源码 接口 ConsumeMessageService 的两个实现类 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService 里,感兴趣的敌人能够看看。

音讯重试

RocketMQ 音讯重试是以消费者组为单位 的,音讯重试主题名为 %Retry% + 消费者组名。消费者在启动时会主动订阅该主题,参加该主题的音讯队列负载。要留神的是 播送模式没有内置的音讯重试机制

RocketMQ 的默认重试次数为 16 次,且默认重试间隔时间为(10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),超过 16 次都是 2h。

定时音讯

定时音讯 是 RocketMQ 提供的一种高级音讯类型,音讯被发送至服务端后,在指定工夫后能力被消费者生产。通过设置肯定的定时工夫能够实现分布式场景的延时调度触发成果。RocketMQ 并不反对任意工夫精度的定时调度(反对的话将不可避免地带来微小的性能耗费)。音讯提早级别在 Broker 端通过 messageDelayLevel 来管制,默认为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)。上文音讯重试也提到过这组数字,音讯重试也正是借助定时工作实现的。RocketMQ 高版本也开始了反对自定义延迟时间。

public class Message implements Serializable {。。。public void setDelayTimeLevel(int level) {this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
  })

发送提早音讯时,只须要调用 Message 类中的 setDelayTimeLevel 办法来设置延时级别。level 值是 1 到 18 的 int 数值,对应 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

事务音讯

事务音讯是 RocketMQ 提供的一种高级音讯类型,反对在分布式场景下保障音讯生产和本地事务的最终一致性

事务音讯产生在 Producer 和 Broker 之间。事务音讯通过 TransactionMQProducer 实现。通过 TransactionMQProducer 类中的 sendMessageInTransaction(final Message msg,final Object arg)办法发送半音讯后,监听类实现如下。

public class MyLocalTransactionListener implements TransactionListener {

  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      // 执行本地事务
      //。。。// 返回事务状态
      return LocalTransactionState.UNKNOW;
  }
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      // 查看事务状态
      //。。。// 返回事务状态
      return LocalTransactionState.COMMIT_MESSAGE;
  }
}

executeLocalTransaction 办法来执行本地事务,返回事务状态。checkLocalTransaction 用来查看本地事务状态,并回应音讯队列的查看申请。

音讯过滤

RocketMQ 反对两种音讯过滤模式:表达式(TAG、SQL92)与类过滤模式。

  • TAG 模式 是最罕用的音讯过滤模式,它基于音讯的标签(Tag)进行过滤。
  • SQL92 模式  是一种更灵便的音讯过滤模式,是通过 音讯的属性 运行相似 SQL 过滤表达式进行条件匹配,音讯发送时须要设置用户的属性 putUserProperty 办法设置属性。
  • 类过滤模式 是一种基于消费者类名的过滤形式,在生产端实现过滤逻辑。

音讯类型(MessageType)

音讯类型是 RocketMQ 中依照音讯传输个性的不同而定义的分类,用于类型治理和平安校验。RocketMQ 反对的音讯类型有一般音讯、程序音讯、事务音讯和定时音讯。

Apache RocketMQ 从 5.0 版本开始,反对强制校验音讯类型,即每个主题 Topic 只容许发送一种音讯类型的音讯,这样能够更好的运维和治理生产零碎,防止凌乱。但同时保障向下兼容 4.x 版本行为,强制校验性能默认敞开,举荐通过服务端参数 enableTopicMessageTypeCheck 手动开启校验。

音讯位点(MessageQueueOffset)

音讯是按达到 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条音讯在队列中都有一个惟一的 Long 类型坐标,这个坐标被定义为音讯位点。每个音讯消费者能够通过音讯位点来确定本人生产的起始地位。也就是音讯在音讯队列中的偏移量。

生产位点(ConsumerOffset)

一条音讯被某个消费者生产实现后不会立刻从队列中删除,RocketMQ 会基于每个消费者分组记录生产过的最新一条音讯的位点,即生产位点

private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

消费者默认是从最初一个位点开始生产的,留神这个配置只在消费者第一次启动时和重均衡时失效。

  • 对于集群生产。生产位点是由 RocketMQ 服务器保护和治理的。在集群生产模式下,同一个生产组内的消费者独特生产同一个音讯队列,每个消费者会独立地保护本人的生产位点。RocketMQ 服务器会将生产位点信息保留在 Broker 端或者 NameServer 端中(不同版本或不同配置)。这样,在消费者重启、重均衡等状况下,服务器可能正确地复原消费者的生产进度。
  • 对于播送生产。生产位点是由各个消费者自行治理并保留在本地存储中。在播送生产模式下,每个消费者都独立地生产所有音讯,因而每个消费者都须要保护本人的生产位点。消费者能够抉择将生产位点保留在文件系统、数据库或其余内部存储中,并在消费者启动时从本地存储中加载上次生产的位点,并在生产过程中定期将生产位点刷新到本地存储。

注意事项

  • 配置管理:RocketMQ 各个组件都有很多配置,应用前务必认真理解并正确配置。
  • 队列数量:队列数量多少关系到生产的能力下限。一个队列只能被一个消费者生产(如果一个 Topic 有 8 个队列,启动了 10 个消费者,那么是有两个消费者闲暇的)。
  • 音讯牢靠:RocketMQ 默认状况下是异步刷盘的,依据业务场景能够强制立刻刷盘。
  • 消费者组:确保不同的消费者组应用不同的消费者名称。同一个消费者组只配置一种生产形式(碰到过一个消费者组有几个集群消费者有几个播送消费者的事件,居然也启动起来了。。。)。
  • 生产程序:要留神是否须要音讯的程序生产,RocketMQ 能够保障单个 Queue 中音讯的程序生产,如果须要保障音讯的程序,能够把相干音讯发送到同一个 Queue 中。
退出移动版