共计 15183 个字符,预计需要花费 38 分钟才能阅读完成。
RocketMQ 的介绍
RocketMQ 作为目前支流的消息中间件之一, 而音讯队列次要利用于以下场景:
- 异步(不须要同步期待)
- 解耦(利用之间不相互依赖)
- 削峰(防止流量激增导致系统性能问题)
RocketMQ 具备以下个性:
- 音讯的订阅与公布(音讯队列的基本功能)
- 程序音讯(生产的程序与发送的程序统一,包含全局有序和分区有序,全局有序的 topic 只有一个音讯队列,实用场景:性能要求不高,所有音讯须要严格有序。分区有序的所有音讯依据 sharding key 进行分区。同一个分区内的音讯依照 FIFO 程序进行公布和生产。Sharding key 是音讯中用来辨别不同分区的关键字段,和一般音讯的 Key 是齐全不同的概念。实用场景:性能要求高,对于某一类音讯须要有序,同时有一个 sharding key 作为分区字段。)
- 定时音讯(音讯发送到 broker 后,不会立刻被生产,期待特定工夫投递给真正的 topic。)
- 事务音讯(利用本地事务和发送音讯操作能够被定义到全局事务中,要么同时胜利,要么同时失败。通过事务音讯能达到分布式事务的最终统一。)
- 音讯重试(消费者生产失败时,RocketMQ 提供重试机制使得音讯能够被再次生产。)
- 流量管制(当 Broker 解决音讯的能力达到瓶颈时,通过回绝生产者的发送申请进行流量管制,当消费者的音讯能力达到瓶颈时,通过升高音讯的拉取频率进行流量管制)
- 死信队列(当一条音讯达到最大重试次数后,若生产仍然失败,则表明消费者在失常状况下无奈正确地生产该音讯,此时,音讯队列不会立即将音讯抛弃,而是将其发送到该消费者对应的非凡队列中。)
RocketMQ 的概念
- 音讯(MESSAGE)
零碎之间相互传递的音讯,每个音讯都属于某一个主题,每个音讯应用惟一的 Message ID 进行标识,同时音讯能够带有标签(TAG)和 键(KEY)。 - 标签 (TAG)
每条音讯能够携带标签,用于同一主题下辨别不同类型的音讯。 - 键(KEY)
除了标签,RocketMQ 的音讯还能够带上 KEY,能够有多个 KEY。 - 主题 (TOPIC)
音讯对应的主题,用于示意音讯的类别,一个主题能够对应多条音讯,生产者生产音讯时须要制订主题,消费者生产音讯时也须要制订主题 - 生产者(PRODUCER)
发送音讯的利用称为生产者。 - 生产者组 (PRODUCER GROUP)
同一类的生产者的汇合,每个生存者组蕴含多个生产者。 - 消费者 (CONSUMER)
生产音讯的利用称为消费者。 - 消费者组 (CONSUMER GROUP)
同一类的消费者的汇合,每个消费者组蕴含多个消费者。 - 代理服务器 (BROKER)
音讯直达角色,负责存储音讯、转发音讯。接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备。也存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。 - 名字服务器 (NAME SERVER)
相似于注册核心,消费者和生产者的注册和发现都依赖于名字服务器,同时会保留 broker 的信息,生产者或消费者可能通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但互相独立,没有信息替换。 -
消费者获取音讯的形式:
- 拉取式生产
利用通常被动调用 Consumer 的拉音讯办法从 Broker 服务器拉音讯、主动权由利用管制。一旦获取了批量音讯,利用就会启动生产过程。 - 推动式生产
该模式下 Broker 收到数据后会被动推送给生产端,该生产模式个别实时性较高。
- 拉取式生产
-
生产模式:
- 集群生产
集群生产模式下, 雷同 Consumer Group 的每个 Consumer 实例均匀摊派音讯。 - 播送生产
播送生产模式下,雷同 Consumer Group 的每个 Consumer 实例都接管全量的音讯。
- 集群生产
RocketMQ 的架构设计
整体架构
整个 RocketMQ 的架构如下:
能够整个架构由四局部组成,别离是 Producer, Conusmer, Name Server, Broker。
整个 RocketMQ 的工作流程如下:
- 启动 NameServer,NameServer 起来后监听端口,期待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
- Broker 启动,跟所有的 NameServer 放弃长连贯,定时发送心跳包。心跳包中蕴含以后 Broker 信息 (IP+ 端口等) 以及存储所有 Topic 信息。注册胜利后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
- 收发音讯前,先创立 Topic,创立 Topic 时须要指定该 Topic 要存储在哪些 Broker 上,也能够在发送音讯时主动创立 Topic。
- Producer 发送音讯,启动时先跟 NameServer 集群中的其中一台建设长连贯,并从 NameServer 中获取以后发送的 Topic 存在哪些 Broker 上,轮询从队列列表中抉择一个队列,而后与队列所在的 Broker 建设长连贯从而向 Broker 发消息。
- Consumer 跟 Producer 相似,跟其中一台 NameServer 建设长连贯,获取以后订阅 Topic 存在哪些 Broker 上,而后间接跟 Broker 建设连贯通道,开始生产音讯。
音讯存储
- CommitLog:音讯主体以及元数据的存储主体,存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,右边补零,残余为起始偏移量,比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。音讯次要是程序写入日志文件,当文件满了,写入下一个文件;
- ConsumeQueue:音讯生产队列,引入的目标次要是进步音讯生产的性能,因为 RocketMQ 是基于主题 topic 的订阅模式,音讯生产是针对主题进行的,如果要遍历 commitlog 文件中依据 topic 检索音讯是十分低效的。Consumer 即可依据 ConsumeQueue 来查找待生产的音讯。其中,ConsumeQueue(逻辑生产队列)作为生产音讯的索引,保留了指定 Topic 下的队列音讯在 CommitLog 中的起始物理偏移量 offset,音讯大小 size 和音讯 Tag 的 HashCode 值。consumequeue 文件能够看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织形式如下:topic/queue/file 三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,别离为 8 字节的 commitlog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
- IndexFile:IndexFile(索引文件)提供了一种能够通过 key 或工夫区间来查问音讯的办法。Index 文件的存储地位是:$HOME \store\index${fileName},文件名 fileName 是以创立时的工夫戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 能够保留 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 构造,故 rocketmq 的索引文件其底层实现为 hash 索引。
RocketMQ 采纳的是混合型的存储构造,Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。RocketMQ 的混合型存储构造 (多个 Topic 的音讯实体内容都存储于一个 CommitLog 中) 针对 Producer 和 Consumer 别离采纳了数据和索引局部相拆散的存储构造,Producer 发送音讯至 Broker 端,而后 Broker 端应用同步或者异步的形式对音讯刷盘长久化,保留至 CommitLog 中。RocketMQ 应用 Broker 端的后盾服务线程—ReputMessageService 不停地散发申请并异步构建 ConsumeQueue(逻辑生产队列)和 IndexFile(索引文件)数据。
音讯过滤
后面有提到 Consumer 端订阅音讯是通过 ConsumeQueue 拿到音讯的索引,而后再从 CommitLog 外面读取真正的音讯实体内容,ConsumeQueue 的存储构造如下,能够看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的音讯过滤正是基于这个字段值的。
RocketMQ 反对两种音讯过滤形式:
- Tag 过滤形式:Consumer 端在订阅音讯时除了指定 Topic 还能够指定 TAG,如果一个音讯有多个 TAG,能够用 || 分隔。其中,Consumer 端会将这个订阅申请构建成一个 SubscriptionData,发送一个 Pull 音讯的申请给 Broker 端。Broker 端从 RocketMQ 的文件存储层—Store 读取数据之前,会用这些数据先构建一个 MessageFilter,而后传给 Store。Store 从 ConsumeQueue 读取到一条记录后,会用它记录的音讯 tag hash 值去做过滤,因为在服务端只是依据 hashcode 进行判断,无奈准确对 tag 原始字符串进行过滤,故在音讯生产端拉取到音讯后,还须要对音讯的原始 tag 字符串进行比对,如果不同,则抛弃该音讯,不进行音讯生产(如果存在 tag 的 hashcode 统一,则可能导致失落音讯)。
- SQL92 的过滤形式:这种形式的大抵做法和下面的 Tag 过滤形式一样,只是在 Store 层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由 rocketmq-filter 模块负责的。每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 应用了 BloomFilter 防止了每次都去执行。SQL92 的表达式上下文为音讯的属性。
音讯查问
RocketMQ 反对两种形式查问:
- 依照 Message Id 查问音讯。
RocketMQ 中的 MessageId 的长度总共有 16 字节,其中蕴含了音讯存储主机地址(IP 地址和端口),音讯 Commit Log offset。“依照 MessageId 查问音讯”在 RocketMQ 中具体做法是:Client 端从 MessageId 中解析出 Broker 的地址(IP 地址和端口)和 Commit Log 的偏移地址后封装成一个 RPC 申请后通过 Remoting 通信层发送(业务申请码:VIEW_MESSAGE_BY_ID)。Broker 端走的是 QueryMessageProcessor,读取音讯的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个残缺的音讯返回。 - 依照 Message Key 查问音讯。
“依照 Message Key 查问音讯”,次要是基于 RocketMQ 的 IndexFile 索引文件来实现的。RocketMQ 的索引文件逻辑构造,相似 JDK 中 HashMap 的实现。索引文件的具体构造如下:
IndexFile 索引文件为用户提供通过“依照 Message Key 查问音讯”的音讯索引查问服务。如果音讯的 properties 中设置了 UNIQ_KEY 这个属性,就用 topic +“#”+ UNIQ_KEY 的 value 作为 key 来做写入操作。如果音讯设置了 KEYS 属性(多个 KEY 以空格分隔),也会用 topic +“#”+ KEY 来做索引。其中的索引数据蕴含了 Key Hash/CommitLog Offset/Timestamp/Next Index offset 这四个字段,一共 20 Byte。SlotTable 中保留每个 Slot 对应的链表的头指针,NextIndexOffset 保留的是下一个节点的地址。
依照 Message Key 查问音讯”的形式,RocketMQ 的具体做法是,次要通过 Broker 端的 QueryMessageProcessor 业务处理器来查问,读取音讯的过程就是用 topic 和 key 找到 IndexFile 索引文件中的一条记录,依据其中的 commitLog offset 从 CommitLog 文件中读取音讯的实体内容。
事务音讯
RocketMQ 采纳了 2PC 的思维来实现了提交事务音讯,同时减少一个弥补逻辑来解决二阶段超时或者失败的音讯,如下图所示。
上图阐明了事务音讯的大抵计划,其中分为两个流程:失常事务音讯的发送及提交、事务音讯的弥补流程。
- 事务音讯发送及提交:
(1) 发送音讯(half 音讯)。
(2) 服务端响应音讯写入后果。
(3) 依据发送后果执行本地事务(如果写入失败,此时 half 音讯对业务不可见,本地逻辑不执行)。
(4) 依据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成音讯索引,音讯对消费者可见) - 弥补流程:
(1) 对没有 Commit/Rollback 的事务音讯(pending 状态的音讯),从服务端发动一次“回查”
(2) Producer 收到回查音讯,查看回查音讯对应的本地事务的状态
(3) 依据本地事务状态,从新 Commit 或者 Rollback
其中,弥补阶段用于解决音讯 Commit 或者 Rollback 产生超时或者失败的状况。
事务音讯的实现
- 第一阶段的音讯写入
写入的如果事务音讯,将音讯的 Topic 替换为 RMQ_SYS_TRANS_HALF_TOPIC,同时将原来的 Topic 和 Queue 信息存储到音讯的属性中,正因为音讯主题被替换,故音讯并不会转发到该原主题的音讯生产队列,消费者无奈感知音讯的存在,不会生产。同时 RocketMQ 会开启一个定时工作,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取音讯进行生产,依据生产者组获取一个服务提供者发送回查事务状态申请,依据事务状态来决定是提交或回滚音讯。须要留神的是此时半音讯是 Pending 状态的。 - 第二阶段的音讯写入
因为音讯是程序写入 ConmmitLog 的,无奈做到删除一个音讯,为了标记半音讯的状态,引入了 Op 音讯来标记是 Commit 状态还是 Rollback 状态。Op 音讯外面保留了音讯的一些根本信息,根本信息里最次要的就是音讯的 Offset。通过 Offset 能够索引到具体的音讯。
RocketMQ 将 Op 音讯写入到全局一个特定的 Topic 中通过源码中的办法—TransactionalMessageUtil.buildOpTopic();这个 Topic 是一个外部的 Topic(像 Half 音讯的 Topic 一样),不会被用户生产。Op 音讯的内容为对应的 Half 音讯的存储的 Offset,这样通过 Op 音讯能索引到 Half 音讯进行后续的回查操作。
第二阶段如果是 Rollback 操作,则通过发送一条 Op 音讯标记该音讯的状态为 Rollback,同时表明该音讯曾经解决过了。如果第二阶段为 Commit 的操作,则须要发送一条 Op 音讯标记该音讯的状态为 Commit, 同时须要将半音讯读出,并复原出一条残缺的一般音讯进行发送。 - 回查音讯
如果在 RocketMQ 事务音讯的二阶段过程中失败了,例如在做 Commit 操作时,呈现网络问题导致 Commit 失败,那么须要通过肯定的策略使这条音讯最终被 Commit。RocketMQ 采纳了一种弥补机制,称为“回查”。Broker 端对未确定状态的音讯发动回查,将音讯发送到对应的 Producer 端(同一个 Group 的 Producer),由 Producer 依据音讯来查看本地事务的状态,进而执行 Commit 或者 Rollback。Broker 端通过比照 Half 音讯和 Op 音讯进行事务音讯的回查并且推动 CheckPoint(记录那些事务音讯的状态是确定的)。
值得注意的是,rocketmq 并不会无休止的的信息事务状态回查,默认回查 15 次,如果 15 次回查还是无奈得悉事务状态,rocketmq 默认回滚该音讯。
负载平衡
RocketMQ 的负载平衡都是在 Client 端实现的,分为生产者的负载平衡和消费者的负载平衡。
- 生产者的负载平衡
Producer 端在发送音讯的时候,会先依据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认形式下 selectOneMessageQueue() 办法会从 TopicPublishInfo 中的 messageQueueList 中抉择一个队列(MessageQueue)进行发送音讯。具体的容错策略均在 MQFaultStrategy 这个类中定义。 -
消费者的负载平衡
在 RocketMQ 中,Consumer 端的两种生产模式(Push/Pull)都是基于拉模式来获取音讯的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为音讯拉取线程在从服务器拉取到一批音讯后,而后提交到音讯生产线程池后,又“快马加鞭”的持续向服务器再次尝试拉取音讯。如果未拉取到音讯,则提早一下又持续拉取。在两种基于拉模式的生产形式(Push/Pull)中,均须要 Consumer 端在晓得从 Broker 端的哪一个音讯队列—队列中去获取音讯。
集群模式下的具体的步骤如下:- Conumser 向 broker 发送心跳包(蕴含了音讯生产分组名称、订阅关系汇合、音讯通信模式和客户端 id 的值等信息),Broker 端在收到 Consumer 的心跳音讯后,会将它保护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保留在本地缓存变量—channelInfoTable 中。
- 启动负载平衡服务线程 RebalanceService,首先从 rebalanceImpl 实例的本地缓存变量—topicSubscribeInfoTable 中,获取该 Topic 主题下的音讯生产队列汇合(mqSet),而后依据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList()办法向 Broker 端发送获取该生产组下消费者 Id 列表的 RPC 通信申请。对 Topic 下的音讯生产队列、消费者 Id 排序,而后用音讯队列调配策略算法(默认为:音讯队列的平均分配算法),计算出待拉取的音讯队列,最初依据调配队列的后果更新 ProccessQueueTable,尝试删除不在待拉去的音讯队列中的队列,或者在调配中的队列中然而曾经过期的队列。为过滤后的音讯队列汇合(mqSet)中的每个 MessageQueue 创立一个 ProcessQueue 对象并存入 RebalanceImpl 的 processQueueTable 队列中,依据 processQueueTable 构建长轮询对象 PullRequest 对象,会从 broker 获取生产的进度。
RocketMQ 的最佳实际
程序音讯
FIFOProducer.java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class FIFOProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("FIFOProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new FIFOProducer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个工夫前缀
String body = dateStr + "Hello RocketMQ" + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg; // 依据订单 id 抉择发送 queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());// 订单 id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {return orderId;}
public void setOrderId(long orderId) {this.orderId = orderId;}
public String getDesc() {return desc;}
public void setDesc(String desc) {this.desc = desc;}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模仿订单数据
*/
private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("实现");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("实现");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("实现");
orderList.add(orderDemo);
return orderList;
}
}
FIFOConsumer.java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class FIFOConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FIFOConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置 Consumer 第一次启动是从队列头部开始生产还是队列尾部开始生产 <br>
* 如果非第一次启动,那么依照上次生产的地位持续生产
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);
for (MessageExt msg : msgs) {// 能够看到每个 queue 有惟一的 consume 线程来生产, 订单对每个 queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
// 模仿业务逻辑解决中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
事务音讯
TransactionProducer.java
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello Luo" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {Thread.sleep(1000);
}
producer.shutdown();}
}
TransactionListenerImpl.java
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
RocketMQ 的应用倡议
-
producer 应用倡议
- 请管制 producer 实例数,过多的连贯会造成 server 端的资源节约;
- 请给 producer 设置优雅敞开(producer.shutdown()),服务敞开能够立刻开释资源
- 接入多个 rocketmq 集群,为避免 nameserver 笼罩问题,须要设置 InstanceName
-
consumer 应用倡议
- 尽量应用 push consumer
- 接入多个 rocketmq 集群,为避免 nameserver 笼罩问题,须要设置 InstanceName
- 当应用 TAG 的时候,防止同一个 consumerGroup 的 Consumer 应用的 TAG 的类型不同。
参考
RocketMQ 官网文档