MQ
Message Queue,音讯队列,FIFO 构造。
例如电商平台,在用户领取订单后执行对应的操作;
长处:
- 异步
- 削峰
- 解耦
毛病
- 减少零碎复杂性
- 数据一致性
- 可用性
JMS
Java Message Service,Java 音讯服务,相似 JDBC 提供了拜访数据库的规范,JMS 也制订了一套零碎间音讯通信的标准;
区别于 JDBC,JDK 原生包中并未定义 JMS 相干接口。
- ConnectionFactory
- Connection
- Destination
- Session
- MessageConsumer
- MessageProducer
- Message
合作形式图示为;
业界产品
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
单机吞吐量 | 万级 | 万级 | 10 万级 | 10 万级 |
可用性 | 高 | 高 | 十分高 | 十分高 |
可靠性 | 较低概率失落音讯 | 根本不丢 | 能够做到 0 失落 | 能够做到 0 失落 |
性能反对 | 较为欠缺 | 基于 erlang,并发强,性能好,延时低 | 分布式,拓展性好,反对分布式事务 | 较为简单,次要利用与大数据实时计算,日志采集等 |
社区活跃度 | 低 | 中 | 高 | 高 |
ActiveMQ
作为 Apache 下的开源我的项目,齐全反对 JMS 标准。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适宜不过。
疾速开始
增加依赖;
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
音讯发送;
// 1. 创立连贯工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工厂创立连贯
Connection connection = factory.createConnection();
// 3. 启动连贯
connection.start();
// 4. 创立连贯会话 session,第一个参数为是否在事务中解决,第二个参数为应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 依据 session 创立音讯队列目的地
Destination queue = session.createQueue("test-queue");
// 6. 依据 session 和目的地 queue 创立生产者
MessageProducer producer = session.createProducer(queue);
// 7. 依据 session 创立音讯实体
Message message = session.createTextMessage("hello world!");
// 8. 通过生产者 producer 发送音讯实体
producer.send(message);
// 9. 敞开连贯
connection.close();
Spring Boot 集成
主动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration
增加依赖;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
增加 yaml 配置;
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
#音讯模式 true: 播送(Topic),false: 队列(Queue), 默认时 false
pub-sub-domain: true
收发音讯;
@Autowired
private JmsTemplate jmsTemplate;
// 接管音讯
@JmsListener(destination = "test")
public void receiveMsg(String msg) {System.out.println(msg);
}
// 发送音讯
public void sendMsg(String destination, String msg) {jmsTemplate.convertAndSend(destination, msg);
}
高可用
基于 zookeeper 实现主从架构,批改 activemq.xml 节点 persistenceAdapter 配置;
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/levelDB"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
zkPath="/activemq/leveldb-stores"
hostname="localhost"
/>
</persistenceAdapter>
broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false
负载平衡
在高可用集群节点 activemq.xml 增加节点 networkConnectors;
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>
更多详细信息可参考:https://blog.csdn.net/haoyuya…
集群生产
因为公布订阅模式,所有订阅者都会接管到音讯,在生产环境,消费者集群会产生音讯反复生产问题。
ActiveMQ 提供 VirtualTopic 性能,解决多生产端接管同一条音讯的问题。于生产者而言,VirtualTopic 就是一个 topic,对生产而言则是 queue。
在 activemq.xml 增加节点 destinationInterceptors;
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
生产者失常往 testTopic 中发送音讯,订阅者可批改订阅主题为相似 consumer.A.testTopic 这样来生产。
更多详细信息可参考:https://blog.csdn.net/java_co…
RocketMQ
是一个队列模型的消息中间件,具备高性能、高牢靠、高实时、分布式特点。
架构图示
-
Name Server
名称服务器,相似于 Zookeeper 注册核心,提供 Broker 发现;
-
Broker
RocketMQ 的外围组件,绝大部分工作都在 Broker 中实现,接管申请,解决生产,音讯长久化等;
-
Producer
音讯生产方;
-
Consumer
音讯生产方;
疾速开始
装置后,顺次启动 nameserver 和 broker,能够用 mqadmin 治理主题、集群和 broker 等信息;
https://segmentfault.com/a/11…
增加依赖;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
音讯发送;
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
"producer-topic",
"msg",
"hello world".getBytes());
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();
delayLevel 从 1 开始默认顺次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。
音讯接管;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p “hello rocketmq” -n localhost:9876
Spring Boot 集成
增加依赖;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
增加 yaml 配置;
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer
发送音讯;
@Autowired
private RocketMQTemplate mqTemplate;
public void sendMessage(String topic, String tag, String message) {SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
System.out.println(JSON.toJSONString(result));
}
接管音讯;
@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {System.out.println(message);
}
}
Console 控制台
RocketMQ 拓展包提供了治理控制台;
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
反复生产
产生起因:
- 生产者反复投递;
- 音讯队列异样;
- 消费者异样生产;
怎么解决反复生产的问题,换句话怎么保障音讯生产的 幂等性。
通常基于本地音讯表的计划实现,音讯解决过便不再解决。
程序音讯
音讯错乱的起因:
- 一个音讯队列 queue,多个 consumer 生产;
- 一个 queue 对应一个 consumer,然而 consumer 多线程生产;
要保障音讯的程序生产,有三个关键点:
- 音讯程序发送
- 音讯顺序存储
- 音讯程序生产
参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。
分布式事务
在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。
通过 broker 的 HA 高可用,和定时回查 prepare 音讯的状态,来保障最终一致性。