关于rocketmq:RocketMQ使用指南

7次阅读

共计 15183 个字符,预计需要花费 38 分钟才能阅读完成。

RocketMQ 的介绍

RocketMQ 作为目前支流的消息中间件之一, 而音讯队列次要利用于以下场景:

  • 异步(不须要同步期待)
  • 解耦(利用之间不相互依赖)
  • 削峰(防止流量激增导致系统性能问题)

RocketMQ 具备以下个性:

  • 音讯的订阅与公布(音讯队列的基本功能)
  • 程序音讯(生产的程序与发送的程序统一,包含全局有序和分区有序,全局有序的 topic 只有一个音讯队列,实用场景:性能要求不高,所有音讯须要严格有序。分区有序的所有音讯依据 sharding key 进行分区。同一个分区内的音讯依照 FIFO 程序进行公布和生产。Sharding key 是音讯中用来辨别不同分区的关键字段,和一般音讯的 Key 是齐全不同的概念。实用场景:性能要求高,对于某一类音讯须要有序,同时有一个 sharding key 作为分区字段。)
  • 定时音讯(音讯发送到 broker 后,不会立刻被生产,期待特定工夫投递给真正的 topic。)
  • 事务音讯(利用本地事务和发送音讯操作能够被定义到全局事务中,要么同时胜利,要么同时失败。通过事务音讯能达到分布式事务的最终统一。)
  • 音讯重试(消费者生产失败时,RocketMQ 提供重试机制使得音讯能够被再次生产。)
  • 流量管制(当 Broker 解决音讯的能力达到瓶颈时,通过回绝生产者的发送申请进行流量管制,当消费者的音讯能力达到瓶颈时,通过升高音讯的拉取频率进行流量管制)
  • 死信队列(当一条音讯达到最大重试次数后,若生产仍然失败,则表明消费者在失常状况下无奈正确地生产该音讯,此时,音讯队列不会立即将音讯抛弃,而是将其发送到该消费者对应的非凡队列中。)

RocketMQ 的概念

  • 音讯(MESSAGE)
    零碎之间相互传递的音讯,每个音讯都属于某一个主题,每个音讯应用惟一的 Message ID 进行标识,同时音讯能够带有标签(TAG)和 键(KEY)。
  • 标签 (TAG)
    每条音讯能够携带标签,用于同一主题下辨别不同类型的音讯。
  • 键(KEY)
    除了标签,RocketMQ 的音讯还能够带上 KEY,能够有多个 KEY。
  • 主题 (TOPIC)
    音讯对应的主题,用于示意音讯的类别,一个主题能够对应多条音讯,生产者生产音讯时须要制订主题,消费者生产音讯时也须要制订主题
  • 生产者(PRODUCER)
    发送音讯的利用称为生产者。
  • 生产者组 (PRODUCER GROUP)
    同一类的生产者的汇合,每个生存者组蕴含多个生产者。
  • 消费者 (CONSUMER)
    生产音讯的利用称为消费者。
  • 消费者组 (CONSUMER GROUP)
    同一类的消费者的汇合,每个消费者组蕴含多个消费者。
  • 代理服务器 (BROKER)
    音讯直达角色,负责存储音讯、转发音讯。接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备。也存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。
  • 名字服务器 (NAME SERVER)
    相似于注册核心,消费者和生产者的注册和发现都依赖于名字服务器,同时会保留 broker 的信息,生产者或消费者可能通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但互相独立,没有信息替换。
  • 消费者获取音讯的形式:

    • 拉取式生产
      利用通常被动调用 Consumer 的拉音讯办法从 Broker 服务器拉音讯、主动权由利用管制。一旦获取了批量音讯,利用就会启动生产过程。
    • 推动式生产
      该模式下 Broker 收到数据后会被动推送给生产端,该生产模式个别实时性较高。
  • 生产模式:

    • 集群生产
      集群生产模式下, 雷同 Consumer Group 的每个 Consumer 实例均匀摊派音讯。
    • 播送生产
      播送生产模式下,雷同 Consumer Group 的每个 Consumer 实例都接管全量的音讯。

RocketMQ 的架构设计

整体架构

整个 RocketMQ 的架构如下:

能够整个架构由四局部组成,别离是 Producer, Conusmer, Name Server, Broker。
整个 RocketMQ 的工作流程如下:

  1. 启动 NameServer,NameServer 起来后监听端口,期待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
  2. Broker 启动,跟所有的 NameServer 放弃长连贯,定时发送心跳包。心跳包中蕴含以后 Broker 信息 (IP+ 端口等) 以及存储所有 Topic 信息。注册胜利后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  3. 收发音讯前,先创立 Topic,创立 Topic 时须要指定该 Topic 要存储在哪些 Broker 上,也能够在发送音讯时主动创立 Topic。
  4. Producer 发送音讯,启动时先跟 NameServer 集群中的其中一台建设长连贯,并从 NameServer 中获取以后发送的 Topic 存在哪些 Broker 上,轮询从队列列表中抉择一个队列,而后与队列所在的 Broker 建设长连贯从而向 Broker 发消息。
  5. Consumer 跟 Producer 相似,跟其中一台 NameServer 建设长连贯,获取以后订阅 Topic 存在哪些 Broker 上,而后间接跟 Broker 建设连贯通道,开始生产音讯。

音讯存储

  • CommitLog:音讯主体以及元数据的存储主体,存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,右边补零,残余为起始偏移量,比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。音讯次要是程序写入日志文件,当文件满了,写入下一个文件;
  • ConsumeQueue:音讯生产队列,引入的目标次要是进步音讯生产的性能,因为 RocketMQ 是基于主题 topic 的订阅模式,音讯生产是针对主题进行的,如果要遍历 commitlog 文件中依据 topic 检索音讯是十分低效的。Consumer 即可依据 ConsumeQueue 来查找待生产的音讯。其中,ConsumeQueue(逻辑生产队列)作为生产音讯的索引,保留了指定 Topic 下的队列音讯在 CommitLog 中的起始物理偏移量 offset,音讯大小 size 和音讯 Tag 的 HashCode 值。consumequeue 文件能够看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织形式如下:topic/queue/file 三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,别离为 8 字节的 commitlog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
  • IndexFile:IndexFile(索引文件)提供了一种能够通过 key 或工夫区间来查问音讯的办法。Index 文件的存储地位是:$HOME \store\index${fileName},文件名 fileName 是以创立时的工夫戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 能够保留 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 构造,故 rocketmq 的索引文件其底层实现为 hash 索引。

RocketMQ 采纳的是混合型的存储构造,Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。RocketMQ 的混合型存储构造 (多个 Topic 的音讯实体内容都存储于一个 CommitLog 中) 针对 Producer 和 Consumer 别离采纳了数据和索引局部相拆散的存储构造,Producer 发送音讯至 Broker 端,而后 Broker 端应用同步或者异步的形式对音讯刷盘长久化,保留至 CommitLog 中。RocketMQ 应用 Broker 端的后盾服务线程—ReputMessageService 不停地散发申请并异步构建 ConsumeQueue(逻辑生产队列)和 IndexFile(索引文件)数据。

音讯过滤

后面有提到 Consumer 端订阅音讯是通过 ConsumeQueue 拿到音讯的索引,而后再从 CommitLog 外面读取真正的音讯实体内容,ConsumeQueue 的存储构造如下,能够看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的音讯过滤正是基于这个字段值的。

RocketMQ 反对两种音讯过滤形式:

  • Tag 过滤形式:Consumer 端在订阅音讯时除了指定 Topic 还能够指定 TAG,如果一个音讯有多个 TAG,能够用 || 分隔。其中,Consumer 端会将这个订阅申请构建成一个 SubscriptionData,发送一个 Pull 音讯的申请给 Broker 端。Broker 端从 RocketMQ 的文件存储层—Store 读取数据之前,会用这些数据先构建一个 MessageFilter,而后传给 Store。Store 从 ConsumeQueue 读取到一条记录后,会用它记录的音讯 tag hash 值去做过滤,因为在服务端只是依据 hashcode 进行判断,无奈准确对 tag 原始字符串进行过滤,故在音讯生产端拉取到音讯后,还须要对音讯的原始 tag 字符串进行比对,如果不同,则抛弃该音讯,不进行音讯生产(如果存在 tag 的 hashcode 统一,则可能导致失落音讯)。
  • SQL92 的过滤形式:这种形式的大抵做法和下面的 Tag 过滤形式一样,只是在 Store 层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由 rocketmq-filter 模块负责的。每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 应用了 BloomFilter 防止了每次都去执行。SQL92 的表达式上下文为音讯的属性。

音讯查问

RocketMQ 反对两种形式查问:

  • 依照 Message Id 查问音讯。
    RocketMQ 中的 MessageId 的长度总共有 16 字节,其中蕴含了音讯存储主机地址(IP 地址和端口),音讯 Commit Log offset。“依照 MessageId 查问音讯”在 RocketMQ 中具体做法是:Client 端从 MessageId 中解析出 Broker 的地址(IP 地址和端口)和 Commit Log 的偏移地址后封装成一个 RPC 申请后通过 Remoting 通信层发送(业务申请码:VIEW_MESSAGE_BY_ID)。Broker 端走的是 QueryMessageProcessor,读取音讯的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个残缺的音讯返回。
  • 依照 Message Key 查问音讯。
    “依照 Message Key 查问音讯”,次要是基于 RocketMQ 的 IndexFile 索引文件来实现的。RocketMQ 的索引文件逻辑构造,相似 JDK 中 HashMap 的实现。索引文件的具体构造如下:

    IndexFile 索引文件为用户提供通过“依照 Message Key 查问音讯”的音讯索引查问服务。如果音讯的 properties 中设置了 UNIQ_KEY 这个属性,就用 topic +“#”+ UNIQ_KEY 的 value 作为 key 来做写入操作。如果音讯设置了 KEYS 属性(多个 KEY 以空格分隔),也会用 topic +“#”+ KEY 来做索引。其中的索引数据蕴含了 Key Hash/CommitLog Offset/Timestamp/Next Index offset 这四个字段,一共 20 Byte。SlotTable 中保留每个 Slot 对应的链表的头指针,NextIndexOffset 保留的是下一个节点的地址。
    依照 Message Key 查问音讯”的形式,RocketMQ 的具体做法是,次要通过 Broker 端的 QueryMessageProcessor 业务处理器来查问,读取音讯的过程就是用 topic 和 key 找到 IndexFile 索引文件中的一条记录,依据其中的 commitLog offset 从 CommitLog 文件中读取音讯的实体内容。

事务音讯

RocketMQ 采纳了 2PC 的思维来实现了提交事务音讯,同时减少一个弥补逻辑来解决二阶段超时或者失败的音讯,如下图所示。

上图阐明了事务音讯的大抵计划,其中分为两个流程:失常事务音讯的发送及提交、事务音讯的弥补流程。

  1. 事务音讯发送及提交:
    (1) 发送音讯(half 音讯)。
    (2) 服务端响应音讯写入后果。
    (3) 依据发送后果执行本地事务(如果写入失败,此时 half 音讯对业务不可见,本地逻辑不执行)。
    (4) 依据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成音讯索引,音讯对消费者可见)
  2. 弥补流程:
    (1) 对没有 Commit/Rollback 的事务音讯(pending 状态的音讯),从服务端发动一次“回查”
    (2) Producer 收到回查音讯,查看回查音讯对应的本地事务的状态
    (3) 依据本地事务状态,从新 Commit 或者 Rollback
    其中,弥补阶段用于解决音讯 Commit 或者 Rollback 产生超时或者失败的状况。

事务音讯的实现

  1. 第一阶段的音讯写入
    写入的如果事务音讯,将音讯的 Topic 替换为 RMQ_SYS_TRANS_HALF_TOPIC,同时将原来的 Topic 和 Queue 信息存储到音讯的属性中,正因为音讯主题被替换,故音讯并不会转发到该原主题的音讯生产队列,消费者无奈感知音讯的存在,不会生产。同时 RocketMQ 会开启一个定时工作,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取音讯进行生产,依据生产者组获取一个服务提供者发送回查事务状态申请,依据事务状态来决定是提交或回滚音讯。须要留神的是此时半音讯是 Pending 状态的。
  2. 第二阶段的音讯写入
    因为音讯是程序写入 ConmmitLog 的,无奈做到删除一个音讯,为了标记半音讯的状态,引入了 Op 音讯来标记是 Commit 状态还是 Rollback 状态。Op 音讯外面保留了音讯的一些根本信息,根本信息里最次要的就是音讯的 Offset。通过 Offset 能够索引到具体的音讯。
    RocketMQ 将 Op 音讯写入到全局一个特定的 Topic 中通过源码中的办法—TransactionalMessageUtil.buildOpTopic();这个 Topic 是一个外部的 Topic(像 Half 音讯的 Topic 一样),不会被用户生产。Op 音讯的内容为对应的 Half 音讯的存储的 Offset,这样通过 Op 音讯能索引到 Half 音讯进行后续的回查操作。

    第二阶段如果是 Rollback 操作,则通过发送一条 Op 音讯标记该音讯的状态为 Rollback,同时表明该音讯曾经解决过了。如果第二阶段为 Commit 的操作,则须要发送一条 Op 音讯标记该音讯的状态为 Commit, 同时须要将半音讯读出,并复原出一条残缺的一般音讯进行发送。
  3. 回查音讯
    如果在 RocketMQ 事务音讯的二阶段过程中失败了,例如在做 Commit 操作时,呈现网络问题导致 Commit 失败,那么须要通过肯定的策略使这条音讯最终被 Commit。RocketMQ 采纳了一种弥补机制,称为“回查”。Broker 端对未确定状态的音讯发动回查,将音讯发送到对应的 Producer 端(同一个 Group 的 Producer),由 Producer 依据音讯来查看本地事务的状态,进而执行 Commit 或者 Rollback。Broker 端通过比照 Half 音讯和 Op 音讯进行事务音讯的回查并且推动 CheckPoint(记录那些事务音讯的状态是确定的)。
    值得注意的是,rocketmq 并不会无休止的的信息事务状态回查,默认回查 15 次,如果 15 次回查还是无奈得悉事务状态,rocketmq 默认回滚该音讯。

负载平衡

RocketMQ 的负载平衡都是在 Client 端实现的,分为生产者的负载平衡和消费者的负载平衡。

  • 生产者的负载平衡
    Producer 端在发送音讯的时候,会先依据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认形式下 selectOneMessageQueue() 办法会从 TopicPublishInfo 中的 messageQueueList 中抉择一个队列(MessageQueue)进行发送音讯。具体的容错策略均在 MQFaultStrategy 这个类中定义。
  • 消费者的负载平衡
    在 RocketMQ 中,Consumer 端的两种生产模式(Push/Pull)都是基于拉模式来获取音讯的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为音讯拉取线程在从服务器拉取到一批音讯后,而后提交到音讯生产线程池后,又“快马加鞭”的持续向服务器再次尝试拉取音讯。如果未拉取到音讯,则提早一下又持续拉取。在两种基于拉模式的生产形式(Push/Pull)中,均须要 Consumer 端在晓得从 Broker 端的哪一个音讯队列—队列中去获取音讯。
    集群模式下的具体的步骤如下:

    1. Conumser 向 broker 发送心跳包(蕴含了音讯生产分组名称、订阅关系汇合、音讯通信模式和客户端 id 的值等信息),Broker 端在收到 Consumer 的心跳音讯后,会将它保护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保留在本地缓存变量—channelInfoTable 中。
    2. 启动负载平衡服务线程 RebalanceService,首先从 rebalanceImpl 实例的本地缓存变量—topicSubscribeInfoTable 中,获取该 Topic 主题下的音讯生产队列汇合(mqSet),而后依据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList()办法向 Broker 端发送获取该生产组下消费者 Id 列表的 RPC 通信申请。对 Topic 下的音讯生产队列、消费者 Id 排序,而后用音讯队列调配策略算法(默认为:音讯队列的平均分配算法),计算出待拉取的音讯队列,最初依据调配队列的后果更新 ProccessQueueTable,尝试删除不在待拉去的音讯队列中的队列,或者在调配中的队列中然而曾经过期的队列。为过滤后的音讯队列汇合(mqSet)中的每个 MessageQueue 创立一个 ProcessQueue 对象并存入 RebalanceImpl 的 processQueueTable 队列中,依据 processQueueTable 构建长轮询对象 PullRequest 对象,会从 broker 获取生产的进度。

RocketMQ 的最佳实际

程序音讯

FIFOProducer.java

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class FIFOProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("FIFOProducerGroup");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};

        // 订单列表
        List<OrderStep> orderList = new FIFOProducer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < 10; i++) {
            // 加个工夫前缀
            String body = dateStr + "Hello RocketMQ" + orderList.get(i);
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;  // 依据订单 id 抉择发送 queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());// 订单 id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();}

    /**
     * 订单的步骤
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {return orderId;}

        public void setOrderId(long orderId) {this.orderId = orderId;}

        public String getDesc() {return desc;}

        public void setDesc(String desc) {this.desc = desc;}

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模仿订单数据
     */
    private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创立");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创立");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创立");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("实现");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("实现");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("实现");
        orderList.add(orderDemo);

        return orderList;
    }
}

FIFOConsumer.java

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class FIFOConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FIFOConsumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 设置 Consumer 第一次启动是从队列头部开始生产还是队列尾部开始生产 <br>
         * 如果非第一次启动,那么依照上次生产的地位持续生产
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);
                for (MessageExt msg : msgs) {// 能够看到每个 queue 有惟一的 consume 线程来生产, 订单对每个 queue(分区)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                try {
                    // 模仿业务逻辑解决中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

事务音讯

TransactionProducer.java

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;


public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                                ("Hello Luo" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {Thread.sleep(1000);
        }
        producer.shutdown();}
}

TransactionListenerImpl.java

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

RocketMQ 的应用倡议

  • producer 应用倡议

    1. 请管制 producer 实例数,过多的连贯会造成 server 端的资源节约;
    2. 请给 producer 设置优雅敞开(producer.shutdown()),服务敞开能够立刻开释资源
    3. 接入多个 rocketmq 集群,为避免 nameserver 笼罩问题,须要设置 InstanceName
  • consumer 应用倡议

    1. 尽量应用 push consumer
    2. 接入多个 rocketmq 集群,为避免 nameserver 笼罩问题,须要设置 InstanceName
    3. 当应用 TAG 的时候,防止同一个 consumerGroup 的 Consumer 应用的 TAG 的类型不同。

参考

RocketMQ 官网文档

正文完
 0