关于java:MQ-入门实践

9次阅读

共计 5576 个字符,预计需要花费 14 分钟才能阅读完成。

MQ

Message Queue,音讯队列,FIFO 构造。

例如电商平台,在用户领取订单后执行对应的操作;

长处:

  • 异步
  • 削峰
  • 解耦

毛病

  • 减少零碎复杂性
  • 数据一致性
  • 可用性

JMS

Java Message Service,Java 音讯服务,相似 JDBC 提供了拜访数据库的规范,JMS 也制订了一套零碎间音讯通信的标准;

区别于 JDBC,JDK 原生包中并未定义 JMS 相干接口。

  1. ConnectionFactory
  2. Connection
  3. Destination
  4. Session
  5. MessageConsumer
  6. MessageProducer
  7. Message

合作形式图示为;

业界产品

ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量 万级 万级 10 万级 10 万级
可用性 十分高 十分高
可靠性 较低概率失落音讯 根本不丢 能够做到 0 失落 能够做到 0 失落
性能反对 较为欠缺 基于 erlang,并发强,性能好,延时低 分布式,拓展性好,反对分布式事务 较为简单,次要利用与大数据实时计算,日志采集等
社区活跃度

ActiveMQ

作为 Apache 下的开源我的项目,齐全反对 JMS 标准。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适宜不过。

疾速开始

增加依赖;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

音讯发送;

// 1. 创立连贯工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工厂创立连贯
Connection connection = factory.createConnection();
// 3. 启动连贯
connection.start();
// 4. 创立连贯会话 session,第一个参数为是否在事务中解决,第二个参数为应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 依据 session 创立音讯队列目的地
Destination queue = session.createQueue("test-queue");
// 6. 依据 session 和目的地 queue 创立生产者
MessageProducer producer = session.createProducer(queue);
// 7. 依据 session 创立音讯实体
Message message = session.createTextMessage("hello world!");
// 8. 通过生产者 producer 发送音讯实体
producer.send(message);
// 9. 敞开连贯
connection.close();

Spring Boot 集成

主动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

增加依赖;

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

增加 yaml 配置;

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    #音讯模式 true: 播送(Topic),false: 队列(Queue), 默认时 false
    pub-sub-domain: true

收发音讯;

@Autowired
private JmsTemplate jmsTemplate;

// 接管音讯
@JmsListener(destination = "test")
public void receiveMsg(String msg) {System.out.println(msg);
}

// 发送音讯
public void sendMsg(String destination, String msg) {jmsTemplate.convertAndSend(destination, msg);
}

高可用

基于 zookeeper 实现主从架构,批改 activemq.xml 节点 persistenceAdapter 配置;

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/levelDB"
        replicas="3"
        bind="tcp://0.0.0.0:0"
        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
        zkPath="/activemq/leveldb-stores"
        hostname="localhost"
    />
</persistenceAdapter>

broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

负载平衡

在高可用集群节点 activemq.xml 增加节点 networkConnectors;

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>

更多详细信息可参考:https://blog.csdn.net/haoyuya…

集群生产

因为公布订阅模式,所有订阅者都会接管到音讯,在生产环境,消费者集群会产生音讯反复生产问题。

ActiveMQ 提供 VirtualTopic 性能,解决多生产端接管同一条音讯的问题。于生产者而言,VirtualTopic 就是一个 topic,对生产而言则是 queue。

在 activemq.xml 增加节点 destinationInterceptors;

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>

生产者失常往 testTopic 中发送音讯,订阅者可批改订阅主题为相似 consumer.A.testTopic 这样来生产。

更多详细信息可参考:https://blog.csdn.net/java_co…

RocketMQ

是一个队列模型的消息中间件,具备高性能、高牢靠、高实时、分布式特点。

架构图示

  1. Name Server

    名称服务器,相似于 Zookeeper 注册核心,提供 Broker 发现;

  2. Broker

    RocketMQ 的外围组件,绝大部分工作都在 Broker 中实现,接管申请,解决生产,音讯长久化等;

  3. Producer

    音讯生产方;

  4. Consumer

    音讯生产方;

疾速开始

装置后,顺次启动 nameserver 和 broker,能够用 mqadmin 治理主题、集群和 broker 等信息;

https://segmentfault.com/a/11…

增加依赖;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

音讯发送;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
    "producer-topic",
    "msg",
    "hello world".getBytes());
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();

delayLevel 从 1 开始默认顺次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

音讯接管;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p “hello rocketmq” -n localhost:9876

Spring Boot 集成

增加依赖;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

增加 yaml 配置;

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer

发送音讯;

@Autowired
private RocketMQTemplate mqTemplate;

public void sendMessage(String topic, String tag, String message) {SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
    System.out.println(JSON.toJSONString(result));
}

接管音讯;

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {System.out.println(message);
    }
}

Console 控制台

RocketMQ 拓展包提供了治理控制台;

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

反复生产

产生起因:

  1. 生产者反复投递;
  2. 音讯队列异样;
  3. 消费者异样生产;

怎么解决反复生产的问题,换句话怎么保障音讯生产的 幂等性

通常基于本地音讯表的计划实现,音讯解决过便不再解决。

程序音讯

音讯错乱的起因:

  1. 一个音讯队列 queue,多个 consumer 生产;
  2. 一个 queue 对应一个 consumer,然而 consumer 多线程生产;

要保障音讯的程序生产,有三个关键点:

  1. 音讯程序发送
  2. 音讯顺序存储
  3. 音讯程序生产

参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。

分布式事务

在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。

通过 broker 的 HA 高可用,和定时回查 prepare 音讯的状态,来保障最终一致性。

正文完
 0