MQ

Message Queue,音讯队列,FIFO 构造。

例如电商平台,在用户领取订单后执行对应的操作;

长处:

  • 异步
  • 削峰
  • 解耦

毛病

  • 减少零碎复杂性
  • 数据一致性
  • 可用性

JMS

Java Message Service,Java音讯服务,相似 JDBC 提供了拜访数据库的规范,JMS 也制订了一套零碎间音讯通信的标准;

区别于 JDBC,JDK 原生包中并未定义 JMS 相干接口。
  1. ConnectionFactory
  2. Connection
  3. Destination
  4. Session
  5. MessageConsumer
  6. MessageProducer
  7. Message

合作形式图示为;

业界产品

ActiveMQRabbitMQRocketMQkafka
单机吞吐量万级万级10 万级10 万级
可用性十分高十分高
可靠性较低概率失落音讯根本不丢能够做到 0 失落能够做到 0 失落
性能反对较为欠缺基于 erlang,并发强,性能好,延时低分布式,拓展性好,反对分布式事务较为简单,次要利用与大数据实时计算,日志采集等
社区活跃度

ActiveMQ

作为 Apache 下的开源我的项目,齐全反对 JMS 标准。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适宜不过。

疾速开始

增加依赖;

<dependency>    <groupId>org.apache.activemq</groupId>    <artifactId>activemq-core</artifactId>    <version>5.7.0</version></dependency>

音讯发送;

// 1. 创立连贯工厂ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 2. 工厂创立连贯Connection connection = factory.createConnection();// 3. 启动连贯connection.start();// 4. 创立连贯会话session,第一个参数为是否在事务中解决,第二个参数为应答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 依据session创立音讯队列目的地Destination queue = session.createQueue("test-queue");// 6. 依据session和目的地queue创立生产者MessageProducer producer = session.createProducer(queue);// 7. 依据session创立音讯实体Message message = session.createTextMessage("hello world!");// 8. 通过生产者producer发送音讯实体producer.send(message);// 9. 敞开连贯connection.close();

Spring Boot 集成

主动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

增加依赖;

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-activemq</artifactId></dependency>

增加 yaml 配置;

spring:  activemq:    broker-url: tcp://localhost:61616  jms:    #音讯模式 true:播送(Topic),false:队列(Queue),默认时false    pub-sub-domain: true

收发音讯;

@Autowiredprivate JmsTemplate jmsTemplate;// 接管音讯@JmsListener(destination = "test")public void receiveMsg(String msg) {    System.out.println(msg);}// 发送音讯public void sendMsg(String destination, String msg) {    jmsTemplate.convertAndSend(destination, msg);}

高可用

基于 zookeeper 实现主从架构,批改 activemq.xml 节点 persistenceAdapter 配置;

<persistenceAdapter>    <replicatedLevelDB        directory="${activemq.data}/levelDB"        replicas="3"        bind="tcp://0.0.0.0:0"        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"        zkPath="/activemq/leveldb-stores"        hostname="localhost"    /></persistenceAdapter>

broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

负载平衡

在高可用集群节点 activemq.xml 增加节点 networkConnectors;

<networkConnectors>    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/></networkConnectors>
更多详细信息可参考:https://blog.csdn.net/haoyuya...

集群生产

因为公布订阅模式,所有订阅者都会接管到音讯,在生产环境,消费者集群会产生音讯反复生产问题。

ActiveMQ 提供 VirtualTopic 性能,解决多生产端接管同一条音讯的问题。于生产者而言,VirtualTopic 就是一个 topic,对生产而言则是 queue。

在 activemq.xml 增加节点 destinationInterceptors;

<destinationInterceptors>     <virtualDestinationInterceptor>         <virtualDestinations>             <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>            </virtualDestinations>    </virtualDestinationInterceptor> </destinationInterceptors>

生产者失常往 testTopic 中发送音讯,订阅者可批改订阅主题为相似 consumer.A.testTopic 这样来生产。

更多详细信息可参考:https://blog.csdn.net/java_co...

RocketMQ

是一个队列模型的消息中间件,具备高性能、高牢靠、高实时、分布式特点。

架构图示

  1. Name Server

    名称服务器,相似于 Zookeeper 注册核心,提供 Broker 发现;

  2. Broker

    RocketMQ 的外围组件,绝大部分工作都在 Broker 中实现,接管申请,解决生产,音讯长久化等;

  3. Producer

    音讯生产方;

  4. Consumer

    音讯生产方;

疾速开始

装置后,顺次启动 nameserver 和 broker,能够用 mqadmin 治理主题、集群和 broker 等信息;

https://segmentfault.com/a/11...

增加依赖;

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-client</artifactId>    <version>4.5.2</version></dependency>

音讯发送;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.setInstanceName("producer");producer.start();Message msg = new Message(    "producer-topic",    "msg",    "hello world".getBytes());//msg.setDelayTimeLevel(1);SendResult sendResult = producer.send(msg);System.out.println(sendResult.toString());producer.shutdown();

delayLevel 从 1 开始默认顺次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

音讯接管;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setInstanceName("consumer");consumer.subscribe("producer-topic", "msg");consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {    for (MessageExt msg : list) {        System.out.println(new String(msg.getBody()));    }    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876

Spring Boot 集成

增加依赖;

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>2.0.4</version></dependency>

增加 yaml 配置;

rocketmq:  name-server: 127.0.0.1:9876  producer:    group: producer

发送音讯;

@Autowiredprivate RocketMQTemplate mqTemplate;public void sendMessage(String topic, String tag, String message) {    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);    System.out.println(JSON.toJSONString(result));}

接管音讯;

@Component@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")public class MsgListener implements RocketMQListener<String> {    @Override    public void onMessage(String message) {        System.out.println(message);    }}

Console 控制台

RocketMQ 拓展包提供了治理控制台;

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

反复生产

产生起因:

  1. 生产者反复投递;
  2. 音讯队列异样;
  3. 消费者异样生产;

怎么解决反复生产的问题,换句话怎么保障音讯生产的幂等性

通常基于本地音讯表的计划实现,音讯解决过便不再解决。

程序音讯

音讯错乱的起因:

  1. 一个音讯队列 queue,多个 consumer 生产;
  2. 一个 queue 对应一个 consumer,然而 consumer 多线程生产;

要保障音讯的程序生产,有三个关键点:

  1. 音讯程序发送
  2. 音讯顺序存储
  3. 音讯程序生产

参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。

分布式事务

在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。

通过 broker 的 HA 高可用,和定时回查 prepare 音讯的状态,来保障最终一致性。