RocketMQ原生API收发音讯代码样例
pom文件
新建 maven 我的项目或 module,增加 rocketmq-client
依赖。
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.tedu</groupId> <artifactId>demo1</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-store</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build></project>
同步音讯
同步音讯发送要保障强一致性,发到master的音讯向slave复制后,才会向生产者发送反馈信息。
这种可靠性同步地发送形式应用的比拟宽泛,比方:重要的音讯告诉,短信告诉。
生产者
package demo1;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import java.util.Scanner;/*发送同步音讯 */public class Producer { public static void main(String[] args) throws Exception { /* group 雷同的生产者成为一个生产者组 标识发送同一类音讯的Producer,通常发送逻辑统一。 发送一般音讯的时候,仅标识应用,并无特地用途。 若发送事务音讯,发送某条音讯的producer-A宕机, 使得事务音讯始终处于PREPARED状态并超时, 则broker会回查同一个group的其余producer, 确认这条音讯应该commit还是rollback。 但开源版本并不齐全反对事务音讯(阉割了事务回查的代码)。????? */ DefaultMQProducer p = new DefaultMQProducer("producer-demo1"); /* 连贯nameserver集群, 取得注册的broker信息 */ p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876"); p.start(); /* 主题相当于是音讯的分类, 一类音讯应用一个主题 */ String topic = "Topic1"; /* tag 相当于是音讯的二级分类, 在一个主题下, 能够通过 tag 再对音讯进行分类 */ String tag = "TagA"; while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); for (String s : a) { Message msg = new Message(topic, tag, s.getBytes()); //一级分类, 二级分类, 音讯内容 SendResult r = p.send(msg);// 发送音讯后会失去服务器反馈, 蕴含: smsgId, sendStatus, queue, queueOffset, offsetMsgId System.out.println(r); } } }}
消费者
消费者的要点:
1. push 和 pull
消费者有两种模式:push 和 pull。
push 模式由服务器被动向消费者发送音讯;pull 模式由消费者被动向服务器申请音讯。
在消费者解决能力无限时,为了加重消费者的压力,能够采纳pull模式。少数状况下都采纳 pull 模式。
2. NameServer
消费者须要向 NameServer 询问 Topic 的路由信息。
3. Topic
从指定的Topic接管音讯。Topic相当于是一级分类。
4. Tag
Topic 相当于是一级分类,Tag 相当于是2级分类。
- 多个 Tag 能够这样写:
TagA || TagB || TagC
- 不指定 Tag,或者说接管所有的 Tag,能够写星号:
*
package demo1;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main(String[] args) throws Exception { /* 标识一类Consumer的汇合名称, 这类Consumer通常生产一类音讯,且生产逻辑统一。 同一个Consumer Group下的各个实例将独特生产 topic的音讯,起到负载平衡的作用。 生产进度以Consumer Group为粒度治理,不同 Consumer Group之间生产进度彼此不受影响, 即音讯A被Consumer Group1生产过,也会再 给Consumer Group2生产。 注: RocketMQ要求同一个Consumer Group的 消费者必须要领有雷同的注册信息,即必须要听一样 的topic(并且tag也一样)。 */ DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo1"); c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876"); c.subscribe("Topic1", "TagA"); c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody()) + " - " + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
异步音讯
master 收到音讯后立刻向生产者进行反馈。之后再以异步形式向 slave 复制音讯。
异步音讯通常用在对响应工夫敏感的业务场景,即发送端不能容忍长时间地期待Broker的响应。
生产者
package demo2;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Scanner;/*异步发送音讯一条音讯送出后, 不用暂停期待服务器针对这条音讯的反馈, 而是能够立刻发送后续音讯.应用监听器, 以异步的形式接管服务器的反馈 */public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException { DefaultMQProducer p = new DefaultMQProducer("producer-demo2"); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); p.start(); p.setRetryTimesWhenSendAsyncFailed(0); String topic = "Topic2"; String tag = "TagA"; String key = "Key-demo2"; while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); for (String s : a) { Message msg = new Message(topic, tag, key, s.getBytes()); p.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("nn音讯发送胜利 : "+sendResult); } @Override public void onException(Throwable throwable) { System.out.println("nn音讯发送失败"); } }); System.out.println("--------------------音讯已送出-----------------------"); } } }}
消费者
package demo2;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*与 demo1.Consumer 完全相同 */public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2"); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); c.subscribe("Topic2", "TagA"); c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody()) + " - " + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
单向音讯
这种形式次要用在不特地关怀发送后果的场景,例如日志发送。
生产者
package demo3;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Scanner;/*单向音讯音讯收回后, 服务器不会返回后果 */public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException { DefaultMQProducer p = new DefaultMQProducer("producer-demo3"); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); p.start(); String topic = "Topic3"; String tag = "TagA"; while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); for (String s : a) { Message msg = new Message(topic, tag, s.getBytes()); p.sendOneway(msg); } System.out.println("--------------------音讯已送出-----------------------"); } }}
消费者
package demo3;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*与 demo1.Consumer 完全相同 */public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2"); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); c.subscribe("Topic3", "TagA"); c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody()) + " - " + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
程序音讯
上图演示了 Rocketmq 程序音讯的基本原理:
- 同一组有序的音讯序列,会被发送到同一个队列,依照 FIFO 的形式进行解决
- 一个队列只容许一个消费者线程接管音讯,这样就保障音讯按程序被接管
上面以订单为例:
一个订单的程序流程是:创立、付款、推送、实现。订单号雷同的音讯会被先后发送到同一个队列中。生产时,从同一个队列接管同一个订单的音讯。
生产者
package demo4;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;import java.util.Scanner;/*以下音讯, 雷同id的音讯按程序发送到同一个队列,生产时也从同一个队列按程序生产 topic ======================= queue1 ======================= queue2111,音讯1 111,音讯2 111,音讯3 ------->======================= queue3 ======================= queue4222,音讯1 222,音讯2 222,音讯3 ------->======================= queue5 ======================= queue6333,音讯1 333,音讯2 333,音讯3 ------->======================= queue7 ======================= queue8 ...... */public class Producer { static String[] msgs = { "15103111039,创立", "15103111065,创立", "15103111039,付款", "15103117235,创立", "15103111065,付款", "15103117235,付款", "15103111065,实现", "15103111039,推送", "15103117235,实现", "15103111039,实现" }; public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer("producer-demo4"); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); p.start(); String topic = "Topic4"; String tag = "TagA"; for (String s : msgs) { System.out.println("按回车发送此音讯: "+s); new Scanner(System.in).nextLine(); Message msg = new Message(topic, tag, s.getBytes()); String[] a = s.split(","); long orderId = Long.parseLong(a[0]); /* MessageQueueSelector用来抉择发送的队列, 这里用订单的id对队列数量取余来计算队列索引 send(msg, queueSelector, obj) 第三个参数会传递到queueSelector, 作为它的第三个参数 */ SendResult r = p.send(msg, new MessageQueueSelector() { /* 三个参数的含意: queueList: 以后Topic中所有队列的列表 message: 音讯 o: send()办法传入的orderId */ @Override public MessageQueue select(List<MessageQueue> queueList, Message message, Object o) { Long orderId = (Long) o; //订单id对队列数量取余, 雷同订单id失去雷同的队列索引 long index = orderId % queueList.size(); System.out.println("音讯已发送到: "+queueList.get((int) index)); return queueList.get((int) index); } }, orderId); System.out.println(r+"nn"); } }}
消费者
package demo4;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo4"); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); c.subscribe("Topic4", "*"); c.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { String t = Thread.currentThread().getName(); for (MessageExt msg : list) { System.out.println(t+" - "+ msg.getQueueId() + " - " +new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
延时音讯
音讯发送到 Rocketmq 服务器后, 提早肯定工夫再向消费者进行投递。
延时音讯的应用场景:
比方电商里,提交了一个订单就能够发送一个延时音讯,1h后去查看这个订单的状态,如果还是未付款就勾销订单开释库存。
生产者发送音讯时,对音讯进行延时设置:
msg.setDelayTimeLevel(3);
其中 3
代表级别而不是一个具体的工夫值,级别和延时时长对应关系是在 MessageStoreConfig
类种进行定义的:
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
对应关系表:
级别 | 延时时长 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
生产者
package demo5;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Scanner;/*延时音讯延时音讯的应用场景比方电商里,提交了一个订单就能够发送一个延时音讯,1h后去查看这个订单的状态,如果还是未付款就勾销订单开释库存。 */public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer("producer-demo5"); p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876"); p.start(); while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); for (String s : a) { Message msg = new Message("Topic5", s.getBytes()); /* 设置音讯的延迟时间,这里不反对任意的工夫,只反对18个固定的提早时长, 别离用Leven 1到18 来示意: org/apache/rocketmq/store/config/MessageStoreConfig.java this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; */ msg.setDelayTimeLevel(3); p.send(msg); } } }}
消费者
package demo5;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo5"); c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876"); c.subscribe("Topic5", "*"); c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) { System.out.println("------------------------------"); for (MessageExt msg : list) { long t = System.currentTimeMillis() - msg.getBornTimestamp(); System.out.println(new String(msg.getBody()) + " - 提早: "+t); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
批量音讯
批量发送音讯能显著进步传递小音讯的性能。限度是这些批量音讯应该有雷同的topic,雷同的waitStoreMsgOK,而且不能是延时音讯。此外,这一批音讯的总大小不应超过4MB。
生产者
package demo6;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.ArrayList;import java.util.Scanner;/*批量发送音讯能显著进步传递小音讯的性能。限度是:- 这些批量音讯应该有雷同的topic,- 雷同的waitStoreMsgOK,- 而且不能是延时音讯。- 这一批音讯的总大小不应超过4MB。如果超出4M须要进行数据宰割, 请参考官网代码样例https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md */public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer("producer-demo6"); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); p.start(); String topic = "Topic6"; while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); ArrayList<Message> messages = new ArrayList<>(); for (String s : a) { messages.add(new Message(topic, s.getBytes())); } p.send(messages); System.out.println("批量音讯已发送"); } }}
消费者
package demo6;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo6"); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); c.subscribe("Topic6", "*"); c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("收到: "+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
音讯过滤
Tag 过滤
Tag 能够满足大多数音讯过滤的需要。应用 Tag 过滤非常简单,例如:
consumer.subscribe("Topic1", "TagA || TagB || TagC");
对自定义属性过滤
生产者能够在音讯中增加自定义的属性:
msg.putUserProperty("prop1", "1");msg.putUserProperty("prop2", "2");
消费者接收数据时,能够依据属性来过滤音讯:
consumer.subscribe("Topic7", MessageSelector.bySql("prop1=1 or prop2=2"));
能够看到,自定义属性的过滤语法是 Sql 语法,RocketMQ只定义了一些根本语法来反对这个个性,反对的 Sql 过滤语法如下:
- 数值比拟,比方:>,>=,<,<=,BETWEEN,=;
- 字符比拟,比方:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
生产者
package demo7;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Random;import java.util.Scanner;/*发送的音讯中蕴含 tag 和 userProperty消费者接管时,能够抉择用 tag 或 userProperty 进行过滤 */public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer("producer-demo7"); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); p.start(); String topic = "Topic7"; while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); System.out.print("输出Tag: "); String tag = new Scanner(System.in).nextLine(); for (String s : a) { Message msg = new Message(topic, tag, s.getBytes()); msg.putUserProperty("rnd", ""+new Random().nextInt(4)); p.send(msg); } } }}
消费者
package demo7;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.MessageSelector;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;import java.util.Scanner;/*如果应用sql过滤,须要在 broker.properties 中增加配置来启用 sql 过滤: enablePropertyFilter=true */public class Consumer { public static void main(String[] args) throws MQClientException { System.out.print("应用Tag过滤还是应用Sql过滤(tag/sql): "); String ts = new Scanner(System.in).nextLine(); DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo7"); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); if (ts.equalsIgnoreCase("tag")) { System.out.println("应用Tag过滤: TagA || TagB || TagC"); c.subscribe("Topic7", "TagA || TagB || TagC"); } else { System.out.println("应用Sql过滤: rnd=1 or rnd > 2"); c.subscribe("Topic7", MessageSelector.bySql("rnd=1 or rnd > 2")); } c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody()) + " - " + msg.getUserProperty("rnd")); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始生产数据"); }}
事务音讯
RocketMQ 提供了可靠性音讯,也叫事务音讯。上面剖析一下其原理。
事务音讯的原理
上面来看 RocketMQ 的事务音讯是如何来发送“可靠消息”的,只须要以下三步:
- 发送半音讯(半音讯不会发送给消费者)
- 执行本地事务
- 提交音讯
实现事务音讯发送后,消费者就能够以失常的形式来生产数据。
RocketMQ 的主动重发机制在绝大多数状况下,都能够保障音讯被正确生产。
如果音讯最终生产失败了,还能够由人工解决进行托底。
下面剖析的是失常状况下的执行流程。上面再来看两种谬误状况:
- 事务执行失败时回滚音讯
- 服务器无奈得悉音讯状态时,须要被动回查音讯状态
回滚:
音讯回查:
生产者
package demo8;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.client.producer.TransactionSendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import java.util.Scanner;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Executors;public class Producer { public static void main(String[] args) throws MQClientException { TransactionMQProducer p = new TransactionMQProducer("producer-demo8"); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876"); p.setExecutorService(Executors.newFixedThreadPool(5)); p.setTransactionListener(new TransactionListener() { ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>(); /* 在这里执行本地事务 */ @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); if (Math.random()<0.333) { System.out.println("本地事务执行胜利, 按回车提交事务音讯"); new Scanner(System.in).nextLine(); localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE); return LocalTransactionState.COMMIT_MESSAGE; } else if (Math.random()<0.666) { System.out.println("本地事务执行失败, 按回车回滚事务音讯"); new Scanner(System.in).nextLine(); localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE); return LocalTransactionState.ROLLBACK_MESSAGE; } else { System.out.println("本地事务执行状况未知, 按回车持续"); new Scanner(System.in).nextLine(); localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW); return LocalTransactionState.UNKNOW; } } /* 回查办法 检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来扭转默认值,单位为毫秒。 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("服务器正在回查音讯状态"); LocalTransactionState s = localTx.get(messageExt.getTransactionId()); if (s == null || s == LocalTransactionState.UNKNOW) { s = LocalTransactionState.ROLLBACK_MESSAGE; } return s; } }); p.start(); String topic = "Topic8"; while (true) { System.out.print("输出音讯,用逗号分隔多条音讯: "); String[] a = new Scanner(System.in).nextLine().split(","); for (String s : a) { Message msg = new Message(topic, s.getBytes()); System.out.println("---------发送半音讯-----------"); TransactionSendResult r = p.sendMessageInTransaction(msg, null); System.out.println("事务音讯发送后果: "+ r.getLocalTransactionState().name()); } } }}
消费者
package demo8;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*如果返回 RECONSUME_LATER, 服务器会期待一会再重试发送音讯音讯属性默认设置 DELAY=6, 等待时间为 2 分钟, org/apache/rocketmq/store/config/MessageStoreConfig.java this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; */public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8"); c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876"); c.subscribe("Topic8", "*"); c.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody()) + " - " + msg); } if (Math.random()<0.5) { System.out.println("音讯解决实现"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { System.out.println("音讯解决失败, 要求服务器稍后重试发送音讯"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); c.start(); System.out.println("开始生产数据"); }}