关于rocketmq:RocketMQ二RocketMQ整合Springboot

52次阅读

共计 22884 个字符,预计需要花费 58 分钟才能阅读完成。

RocketMQ 原生 API 收发音讯代码样例

pom 文件

新建 maven 我的项目或 module,增加 rocketmq-client 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.tedu</groupId>
    <artifactId>demo1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-store</artifactId>
            <version>4.7.1</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

同步音讯

同步音讯发送要保障强一致性,发到 master 的音讯向 slave 复制后,才会向生产者发送反馈信息。

这种可靠性同步地发送形式应用的比拟宽泛,比方:重要的音讯告诉,短信告诉。

生产者

package demo1;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.Scanner;
/*
发送同步音讯
 */
public class Producer {public static void main(String[] args) throws Exception {
        /*
        group 雷同的生产者成为一个生产者组

        标识发送同一类音讯的 Producer,通常发送逻辑统一。发送一般音讯的时候,仅标识应用,并无特地用途。若发送事务音讯,发送某条音讯的 producer- A 宕机,使得事务音讯始终处于 PREPARED 状态并超时,则 broker 会回查同一个 group 的其余 producer,确认这条音讯应该 commit 还是 rollback。但开源版本并不齐全反对事务音讯(阉割了事务回查的代码)。?????
         */
        DefaultMQProducer p = new DefaultMQProducer("producer-demo1");

        /*
        连贯 nameserver 集群, 取得注册的 broker 信息
         */
        p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
        p.start();

        /*
        主题相当于是音讯的分类, 一类音讯应用一个主题
         */
        String topic = "Topic1";

        /*
        tag 相当于是音讯的二级分类, 在一个主题下, 能够通过 tag 再对音讯进行分类
         */
        String tag = "TagA";

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {Message msg = new Message(topic, tag, s.getBytes()); // 一级分类, 二级分类, 音讯内容
                SendResult r = p.send(msg);// 发送音讯后会失去服务器反馈, 蕴含: smsgId, sendStatus, queue, queueOffset, offsetMsgId
                System.out.println(r);
            }
        }
    }
}

消费者

消费者的要点:

1. push 和 pull

消费者有两种模式:push 和 pull。

push 模式由服务器被动向消费者发送音讯;pull 模式由消费者被动向服务器申请音讯。

在消费者解决能力无限时,为了加重消费者的压力,能够采纳 pull 模式。少数状况下都采纳 pull 模式。

2. NameServer

消费者须要向 NameServer 询问 Topic 的路由信息。

3. Topic

从指定的 Topic 接管音讯。Topic 相当于是一级分类。

4. Tag

Topic 相当于是一级分类,Tag 相当于是 2 级分类。

  • 多个 Tag 能够这样写:TagA || TagB || TagC
  • 不指定 Tag,或者说接管所有的 Tag,能够写星号:*
package demo1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {public static void main(String[] args) throws Exception {
        /*
        标识一类 Consumer 的汇合名称,这类 Consumer 通常生产一类音讯,且生产逻辑统一。同一个 Consumer Group 下的各个实例将独特生产
        topic 的音讯,起到负载平衡的作用。生产进度以 Consumer Group 为粒度治理,不同
        Consumer Group 之间生产进度彼此不受影响,即音讯 A 被 Consumer Group1 生产过,也会再
        给 Consumer Group2 生产。注:RocketMQ 要求同一个 Consumer Group 的
        消费者必须要领有雷同的注册信息,即必须要听一样
        的 topic(并且 tag 也一样)。*/
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo1");
        c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");

        c.subscribe("Topic1", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()) + "-" + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
}

异步音讯


master 收到音讯后立刻向生产者进行反馈。之后再以异步形式向 slave 复制音讯。

异步音讯通常用在对响应工夫敏感的业务场景,即发送端不能容忍长时间地期待 Broker 的响应。

生产者

package demo2;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
异步发送音讯

一条音讯送出后, 不用暂停期待服务器针对这条音讯的反馈, 而是能够立刻发送后续音讯.
应用监听器, 以异步的形式接管服务器的反馈
 */
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {DefaultMQProducer p = new DefaultMQProducer("producer-demo2");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        p.setRetryTimesWhenSendAsyncFailed(0);

        String topic = "Topic2";
        String tag = "TagA";
        String key = "Key-demo2";

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {Message msg = new Message(topic, tag, key, s.getBytes());

                p.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {System.out.println("nn 音讯发送胜利 :"+sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {System.out.println("nn 音讯发送失败");
                    }
                });

                System.out.println("-------------------- 音讯已送出 -----------------------");
            }

        }
    }
} 

消费者

package demo2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
/*
与 demo1.Consumer 完全相同
 */
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic2", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()) + "-" + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
} 

单向音讯

这种形式次要用在不特地关怀发送后果的场景,例如日志发送。

生产者

package demo3;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
单向音讯

音讯收回后, 服务器不会返回后果
 */
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {DefaultMQProducer p = new DefaultMQProducer("producer-demo3");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic3";
        String tag = "TagA";

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");
            for (String s : a) {Message msg = new Message(topic, tag, s.getBytes());
                p.sendOneway(msg);
            }
            System.out.println("-------------------- 音讯已送出 -----------------------");
        }

    }
}

消费者

package demo3;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/*
与 demo1.Consumer 完全相同
 */
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic3", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()) + "-" + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
} 

程序音讯

上图演示了 Rocketmq 程序音讯的基本原理:

  • 同一组有序的音讯序列,会被发送到同一个队列,依照 FIFO 的形式进行解决
  • 一个队列只容许一个消费者线程接管音讯,这样就保障音讯按程序被接管

上面以订单为例:

一个订单的程序流程是:创立、付款、推送、实现。订单号雷同的音讯会被先后发送到同一个队列中。生产时,从同一个队列接管同一个订单的音讯。

生产者

package demo4;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;
import java.util.Scanner;
/*
以下音讯, 雷同 id 的音讯按程序发送到同一个队列,
生产时也从同一个队列按程序生产
                                              topic

                                        =======================  queue1
                                        =======================  queue2
111, 音讯 1  111, 音讯 2  111, 音讯 3   ------->=======================  queue3
                                        =======================  queue4
222, 音讯 1  222, 音讯 2  222, 音讯 3   ------->=======================  queue5
                                        =======================  queue6
333, 音讯 1  333, 音讯 2  333, 音讯 3   ------->=======================  queue7
                                        =======================  queue8
                                                    ......
 */
public class Producer {static String[] msgs = {
            "15103111039, 创立",
                                "15103111065, 创立",
            "15103111039, 付款",
                                                    "15103117235, 创立",
                                "15103111065, 付款",
                                                    "15103117235, 付款",
                                "15103111065, 实现",
            "15103111039, 推送",
                                                    "15103117235, 实现",
            "15103111039, 实现"
    };

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer p = new DefaultMQProducer("producer-demo4");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic4";
        String tag = "TagA";

        for (String s : msgs) {System.out.println("按回车发送此音讯:"+s);
            new Scanner(System.in).nextLine();

            Message msg = new Message(topic, tag, s.getBytes());

            String[] a = s.split(",");
            long orderId = Long.parseLong(a[0]);

            /*
            MessageQueueSelector 用来抉择发送的队列,
            这里用订单的 id 对队列数量取余来计算队列索引

            send(msg, queueSelector, obj)
            第三个参数会传递到 queueSelector, 作为它的第三个参数
             */
            SendResult r = p.send(msg, new MessageQueueSelector() {
                /*
                三个参数的含意:
                queueList: 以后 Topic 中所有队列的列表
                message: 音讯
                o: send()办法传入的 orderId
                 */
                @Override
                public MessageQueue select(List<MessageQueue> queueList, Message message, Object o) {Long orderId = (Long) o;
                    // 订单 id 对队列数量取余, 雷同订单 id 失去雷同的队列索引
                    long index = orderId % queueList.size();
                    System.out.println("音讯已发送到:"+queueList.get((int) index));
                    return queueList.get((int) index);
                }
            }, orderId);

            System.out.println(r+"nn");
        }
    }
} 

消费者

package demo4;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo4");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        
        c.subscribe("Topic4", "*");

        c.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {String t = Thread.currentThread().getName();

                for (MessageExt msg : list) {System.out.println(t+"-"+ msg.getQueueId() + "-" +new String(msg.getBody()));
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
}

延时音讯

音讯发送到 Rocketmq 服务器后,提早肯定工夫再向消费者进行投递。

延时音讯的应用场景:

比方电商里,提交了一个订单就能够发送一个延时音讯,1h 后去查看这个订单的状态,如果还是未付款就勾销订单开释库存。

生产者发送音讯时,对音讯进行延时设置:

msg.setDelayTimeLevel(3);

其中 3 代表级别而不是一个具体的工夫值,级别和延时时长对应关系是在 MessageStoreConfig 类种进行定义的:

this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

对应关系表:

级别 延时时长
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

生产者

package demo5;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
延时音讯

延时音讯的应用场景
比方电商里,提交了一个订单就能够发送一个延时音讯,1h 后去查看这个订单的状态,如果还是未付款就勾销订单开释库存。*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer p = new DefaultMQProducer("producer-demo5");
        p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
        p.start();

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {Message msg = new Message("Topic5", s.getBytes());

                /*
                设置音讯的延迟时间, 这里不反对任意的工夫, 只反对 18 个固定的提早时长,
                别离用 Leven 1 到 18 来示意:

                org/apache/rocketmq/store/config/MessageStoreConfig.java
                this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
                 */
                msg.setDelayTimeLevel(3);

                p.send(msg);
            }
        }
    }
}

消费者

package demo5;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo5");
        c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
        c.subscribe("Topic5", "*");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {System.out.println("------------------------------");
                for (MessageExt msg : list) {long t = System.currentTimeMillis() - msg.getBornTimestamp();
                    System.out.println(new String(msg.getBody()) + "- 提早:"+t);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
} 

批量音讯

批量发送音讯能显著进步传递小音讯的性能。限度是这些批量音讯应该有雷同的 topic,雷同的 waitStoreMsgOK,而且不能是延时音讯。此外,这一批音讯的总大小不应超过 4MB。

生产者

package demo6;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.ArrayList;
import java.util.Scanner;
/*
批量发送音讯能显著进步传递小音讯的性能。限度是:- 这些批量音讯应该有雷同的 topic,- 雷同的 waitStoreMsgOK,- 而且不能是延时音讯。- 这一批音讯的总大小不应超过 4MB。如果超出 4M 须要进行数据宰割, 请参考官网代码样例 https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
 */
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer p = new DefaultMQProducer("producer-demo6");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic6";

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");

            ArrayList<Message> messages = new ArrayList<>();
            for (String s : a) {messages.add(new Message(topic, s.getBytes()));
            }

            p.send(messages);
            System.out.println("批量音讯已发送");
        }
    }
}

消费者

package demo6;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo6");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        c.subscribe("Topic6", "*");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg :
                        list) {System.out.println("收到:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
}

音讯过滤

Tag 过滤

Tag 能够满足大多数音讯过滤的需要。应用 Tag 过滤非常简单,例如:

consumer.subscribe("Topic1", "TagA || TagB || TagC");

对自定义属性过滤

生产者能够在音讯中增加自定义的属性:

msg.putUserProperty("prop1", "1");
msg.putUserProperty("prop2", "2");

消费者接收数据时,能够依据属性来过滤音讯:

consumer.subscribe("Topic7", MessageSelector.bySql("prop1=1 or prop2=2"));

能够看到,自定义属性的过滤语法是 Sql 语法,RocketMQ 只定义了一些根本语法来反对这个个性,反对的 Sql 过滤语法如下:

  • 数值比拟,比方:>,>=,<,<=,BETWEEN,=;
  • 字符比拟,比方:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

生产者

package demo7;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Random;
import java.util.Scanner;
/*
发送的音讯中蕴含 tag 和 userProperty

消费者接管时,能够抉择用 tag 或 userProperty 进行过滤
 */
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer p = new DefaultMQProducer("producer-demo7");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic7";

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");
            System.out.print("输出 Tag:");
            String tag = new Scanner(System.in).nextLine();

            for (String s : a) {Message msg = new Message(topic, tag, s.getBytes());
                msg.putUserProperty("rnd", ""+new Random().nextInt(4));
                p.send(msg);
            }

        }
    }
}

消费者

package demo7;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Scanner;

/*
如果应用 sql 过滤, 须要在 broker.properties 中增加配置来启用 sql 过滤:enablePropertyFilter=true
 */
public class Consumer {public static void main(String[] args) throws MQClientException {System.out.print("应用 Tag 过滤还是应用 Sql 过滤(tag/sql):");
        String ts = new Scanner(System.in).nextLine();

        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo7");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        if (ts.equalsIgnoreCase("tag")) {System.out.println("应用 Tag 过滤: TagA || TagB || TagC");
            c.subscribe("Topic7", "TagA || TagB || TagC");
        } else {System.out.println("应用 Sql 过滤: rnd=1 or rnd > 2");
            c.subscribe("Topic7", MessageSelector.bySql("rnd=1 or rnd > 2"));
        }

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()) + "-" + msg.getUserProperty("rnd"));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
}

事务音讯

RocketMQ 提供了可靠性音讯,也叫事务音讯。上面剖析一下其原理。

事务音讯的原理


上面来看 RocketMQ 的 事务音讯 是如何来发送“可靠消息”的,只须要以下三步:

  1. 发送半音讯(半音讯不会发送给消费者)
  2. 执行本地事务
  3. 提交音讯


实现 事务音讯 发送后,消费者就能够以失常的形式来生产数据。

RocketMQ 的主动重发机制在绝大多数状况下,都能够保障音讯被正确生产。

如果音讯最终生产失败了,还能够由人工解决进行托底。


下面剖析的是失常状况下的执行流程。上面再来看两种谬误状况:

  1. 事务执行失败时回滚音讯
  2. 服务器无奈得悉音讯状态时,须要被动回查音讯状态

回滚:


音讯回查:

生产者

package demo8;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

public class Producer {public static void main(String[] args) throws MQClientException {TransactionMQProducer p = new TransactionMQProducer("producer-demo8");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        p.setExecutorService(Executors.newFixedThreadPool(5));

        p.setTransactionListener(new TransactionListener() {ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>();

            /*
            在这里执行本地事务
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println("执行本地事务");
                if (Math.random()<0.333) {System.out.println("本地事务执行胜利, 按回车提交事务音讯");
                    new Scanner(System.in).nextLine();

                    localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (Math.random()<0.666) {System.out.println("本地事务执行失败, 按回车回滚事务音讯");
                    new Scanner(System.in).nextLine();

                    localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {System.out.println("本地事务执行状况未知, 按回车持续");
                    new Scanner(System.in).nextLine();

                    localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW);
                    return LocalTransactionState.UNKNOW;
                }
            }

            /*
            回查办法
            检测频率默认 1 分钟,可通过在 broker.conf 文件中设置 transactionCheckInterval 的值来扭转默认值,单位为毫秒。*/
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("服务器正在回查音讯状态");

                LocalTransactionState s = localTx.get(messageExt.getTransactionId());
                if (s == null || s == LocalTransactionState.UNKNOW) {s = LocalTransactionState.ROLLBACK_MESSAGE;}
                return s;
            }
        });

        p.start();

        String topic = "Topic8";

        while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {Message msg = new Message(topic, s.getBytes());
                System.out.println("--------- 发送半音讯 -----------");
                TransactionSendResult r = p.sendMessageInTransaction(msg, null);
                System.out.println("事务音讯发送后果:"+ r.getLocalTransactionState().name());
            }
        }
    }
} 

消费者

package demo8;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/*

如果返回 RECONSUME_LATER, 服务器会期待一会再重试发送音讯

音讯属性默认设置 DELAY=6, 等待时间为 2 分钟,

                org/apache/rocketmq/store/config/MessageStoreConfig.java
                this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";


 */
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8");
        c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");

        c.subscribe("Topic8", "*");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()) + "-" + msg);
                }
                if (Math.random()<0.5) {System.out.println("音讯解决实现");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {System.out.println("音讯解决失败, 要求服务器稍后重试发送音讯");
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        c.start();
        System.out.println("开始生产数据");
    }
}

正文完
 0