关于java:看看吧月薪20K以上的程序员才能全部掌握RabbitMq知识你掌握了多少

5次阅读

共计 14892 个字符,预计需要花费 38 分钟才能阅读完成。

一、RabbitMq 基础知识

0、概述

音讯队列的作用就是接管音讯生产者的音讯,而后将音讯发送到消费者

1、信道channel

我的了解是生产者 / 消费者和 rabbitmq 交互的一个通道,负责交换机、队列治理;音讯公布和生产治理;事务管理等

2、交换机

四种交换机:

direct:能够用一个或者多个 key 绑定到一个或者多个队列上

topic:反对路由的适配符 # *

Fanout 播送:将音讯发送给所有的队列

Header 头交换机:自定义通过头音讯属性来定义路由的匹配

3、队列:保留音讯的队列

4、消费者:音讯的接收者

5、生产者:音讯的发送者

二、应用 com.rabbitmq.client.* 操作 mq

2.1、基本操作

0、环境和依赖

<!-- 环境 
* jdk 1.8 
* idea
* springboot 2.2.6
-->
<!-- 依赖 这里只导入这个包,其中蕴含了 Rabbit client 的包 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1、创立连贯和信道

// 获取连贯
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");//mq 主机地址
factory.setPort(5672);// 端口,默认时 5672
factory.setUsername("leyou");
factory.setPassword("leyou");
factory.setVirtualHost("/leyou");
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection..createChannel();

2、申明交换机 / 队列 / 绑定交换机和队列

// 交换机名,交换机类型
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
/**
* 第一个参数是 queue:要创立的队列名
* 第二个参数是 durable:是否长久化。如果为 true,能够在 RabbitMQ 解体后复原音讯
* 第三个参数是 exclusive:true 示意一个队列只能被一个消费者占有并生产
* 第四个参数是 autoDelete:true 示意服务器不在应用这个队列是会主动删除它
* 第五个参数是 arguments:包含死信队列,队列的 ttl
*/
channel.queueDeclare(QUEUE_ONE,true,false,false,null);
// 绑定交换机和队列  队列名,交换机名,routekey
channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);

3、公布音讯

//1、交换机名 2、routekey 3、mandatory 强制(须要 return 回调时必须设置为 true) 4、公布音讯参数 5、音讯
channel.basicPublish(EXCHANGE,GIRL,true,null,"xxx 提价了".getBytes());

4、接管音讯

// 接管音讯前也须要获取连贯和 channel,申明队列
// 接管音讯
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 拿到音讯
        System.out.println(new String(body,"utf-8"));
    }
};
/**
* 参数阐明
* 1:队列名字
* 2:是否自动应答 autoACk,为 false 时须要手动 ack
* 3:消费者,当接管到消费者时会调用给对象中的 handleDelivery 办法
*/
channel.basicConsume(QUEUE_ONE,true,consumer);

2.2、根本利用

1、性能:

有两个人小明和小华,小明对美女感兴趣,小华对股票和没事感兴趣,应用音讯队列将他们感兴趣的音讯发送给他们两个

2、实现:

(1)写一个类来提供创立连贯和信道;
(2)生产者(发送音讯方)类发送音讯
(3)消费者(接管音讯)类接管音讯

  • 连贯类
public class ConnectionUtil {
    /**
     * 应用原始的 rabbitmq client api 操作 mq
     */
    private static ConnectionFactory factory = new ConnectionFactory();
    private static Connection connection;
    /*
    获取连贯
    留神导包:须要导 client 上面的包
     */
    public static Connection getConnection() throws IOException, TimeoutException {//        factory.setHost("localhost");
//        factory.setPort(5672);
        factory.setUsername("leyou");
        factory.setPassword("leyou");
        factory.setVirtualHost("/leyou");
        connection = factory.newConnection();
        return connection;
    }

    public static void close() throws IOException {connection.close();
    }
    /*
    创立信道
     */
    public static Channel getChannel() throws IOException, TimeoutException {return getConnection().createChannel();}
}
  • 生产者
// 生产者
public class provice{public void producerMsg() throws IOException, TimeoutException, InterruptedException {Channel channel = ConnectionUtil.getChannel();
        String EXCHANGE = "direct_exchange";
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
        // 定义两个队列名
        String QUEUE_ONE = "beauty_queue";
        String QUEUE_TWO = "food_queue";
        channel.queueDeclare(QUEUE_ONE,true,false,false,null);
        channel.queueDeclare(QUEUE_TWO,true,false,false,null);
        // 定义三个 key
        String GIRL = "girl";
        String SHARE = "share";
        String FOOD = "food";
        // 绑定
        channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
        channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE);
        channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD);
        // 发送音讯
        /**
         * 参数:1 交换机,2routekey 3 mandatory:强制;(须要 return 回调时必须设置为 true)
         * 3 参数,4 音讯字节数据
         */
        channel.basicPublish(EXCHANGE,GIRL,true,null,"快看,是她".getBytes());
        channel.basicPublish(EXCHANGE,SHARE,true,null,"股票涨了".getBytes());
        channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基提价了".getBytes());
        // 敞开连贯
        channel.close();
        ConnectionUtil.close();}
}
  • 消费者
public class ConsumerMq { // 生产音讯

    /**
     * 应用原始的 rabbitmq client api 操作 mq
     */
    String EXCHANGE = "direct_exchange";
    String QUEUE_ONE = "beauty_queue";
    String QUEUE_TWO = "food_queue";
    //key
    String GIRL = "girl";
    String SHARE = "share";
    String FOOD = "food";

    public void consumer() throws IOException, TimeoutException {Channel channel = ConnectionUtil.getChannel();
        /**
         * 第一个参数是 queue:要创立的队列名
         * 第二个参数是 durable:是否长久化。如果为 true,能够在 RabbitMQ 解体后复原音讯
         * 第三个参数是 exclusive:true 示意一个队列只能被一个消费者占有并生产
         * 第四个参数是 autoDelete:true 示意服务器不在应用这个队列是会主动删除它
         * 第五个参数是 arguments:包含死信队列,队列的 ttl,*/
        channel.queueDeclare(QUEUE_ONE,true,false,false,null);
        channel.queueDeclare(QUEUE_TWO,true,false,false,null);
        // 在生产者绑定了交换机和队列,在这里就不须要绑定
        //channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
        //channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE);
        //channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD);

        // 接管音讯
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body,"utf-8"));
                // 手动应答 ack 能够在该办法中进行;参数:1. 音讯 tag,2. 是否批量 ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 参数阐明
         * 1:队列名字
         * 2:是否自动应答 autoACk 为 false 时须要手动 ack
         * 3:消费者,当接管到消费者时会调用给对象中的 handleDelivery 办法
         */
        channel.basicConsume(QUEUE_ONE,false,consumer);
        channel.basicConsume(QUEUE_TWO,false,consumer);
    }
}

2.3、mq 事务,发送方确认,和音讯回调

概述

音讯的发送链路 生产者 -> exchange –> queue –> 消费者;为确保音讯发送到 rabbitmq,amqp 协定提供了三个机制来保障:事务,发送方确认(ack),音讯回调(returncallback);事务的形式和数据库的事务相似,这里不做具体介绍;发送方确认是当音讯发送到交换机时,broker(实现 amqp 协定的服务端,这里指 rabbitmq)会回调发送者的一个固定办法来确认音讯胜利发送;音讯回调是产生在交换机通过路由 key 转发到队列的过程中,如果音讯不能通过 key 找到对应的 queue 则回调一个固定办法将音讯返回给生产者,确保音讯不失落

1、mq 事务

  • rabbitMq 是反对事务的,然而应用事务的效率很低,在音讯数量很大的状况下影响性能

2、发送方确认

对于固定音讯体大小和线程数,如果音讯长久化,生产者 confirm(或者采纳事务机制),消费者 ack 那么对性能有很大的影响.

音讯长久化的优化没有太好办法,用更好的物理存储(SAS, SSD, RAID 卡)总会带来改善。生产者 confirm 这一环节的优化则次要在于客户端程序的优化之上。归纳起来,客户端实现生产者 confirm 有三种编程形式:

  1. 一般 confirm 模式:每发送一条音讯后,调用 waitForConfirms()办法,期待服务器端 confirm。实际上是一种串行 confirm 了。
  2. 批量 confirm 模式:每发送一批音讯后,调用 waitForConfirms()办法,期待服务器端 confirm。
  3. 异步 confirm 模式:提供一个回调办法,服务端 confirm 了一条或者多条音讯后 Client 端会回调这个办法。
  • []  一般 confirm 模式
// 要点
// 第 1 种
// 一般 confirm 模式最简略,publish 一条音讯后,期待服务器端 confirm, 如果服务端返回 false 或者超时工夫内未返回,客户端进行音讯重传。//1. 发消息前
channel.confirmSelect();
//2. 发消息后
// 判断音讯发送是否胜利
if(channel.waitForConfirms()){System.out.println("音讯发送胜利");
}
  • []  批量 confirm 模式

批量 confirm 模式略微简单一点,客户端程序须要定期(每隔多少秒)或者定量(达到多少条)或者两则联合起来 publish 音讯,而后期待服务器端 confirm, 相比一般 confirm 模式,批量极大晋升 confirm 效率,然而问题在于一旦呈现 confirm 返回 false 或者超时的状况时,客户端须要将这一批次的音讯全部重发,这会带来显著的反复音讯数量,并且,当音讯常常失落时,批量 confirm 性能应该是不升反降的。

channel.confirmSelect();
for(int i=0;i<batchCount;i++){channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){System.out.println("send message failed.");
}

异步 confirm 模式

异步 confirm 模式的编程实现最简单,Channel 对象提供的 ConfirmListener()回调办法只蕴含 deliveryTag(以后 Chanel 收回的音讯序号),咱们须要本人为每一个 Channel 保护一个 unconfirm 的音讯序号汇合,每 publish 一条数据,汇合中元素加 1,每回调一次 handleAck 办法,unconfirm 汇合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个 unconfirm 汇合最好采纳有序汇合 SortedSet 存储构造。实际上,SDK 中的 waitForConfirms()办法也是通过 SortedSet 保护音讯序号的。

要害代码:
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 别忘这行代码
 channel.confirmSelect();
// 增加监听器
channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);
        }
    }
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("Nack, SeqNo:" + deliveryTag + ", multiple:" + multiple);
        if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);
        }
    }
});
while (true) {long nextSeqNo = channel.getNextPublishSeqNo();
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    confirmSet.add(nextSeqNo);
}

3、音讯回调

// 要点
//1. 发送音讯是将第三个参数 mandatory 设置为 true
channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基提价了".getBytes());
//2. 增加音讯回调监听器
channel.addReturnListener(new ReturnListener() {
     @Override
     public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {System.out.println("音讯不可路由"+new String(bytes,"utf-8"));
     }
 });
// 留神:开启回调不能敞开连贯和信道,

2.4、接管方确认

1、概述

接管方 ack 分为手动和主动,在接管音讯时设置

// 第二个参数就是指定是否手动 ack false 时为手动
channel.basicConsume(QUEUE_ONE,false,consumer);

手动 ack 有三种

  • 单个确认
  • 单个回绝
  • 批量回绝

2、代码实现

单个确认 ack

// 接管音讯
Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body,"utf-8"));
        // 手动应答 ack 能够在该办法中进行;参数:1. 音讯 tag,2. 是否批量 ack
        channel.basicAck(envelope.getDeliveryTag(),false);
        // 回绝音讯; 参数:1. 音讯 tag;2. 音讯是否从新入队,当只有一个消费者时,会引起反复生产
        channel.basicReject(envelope.getDeliveryTag(),false);
        // 批量 ack 音讯; 参数:1. 音讯 tag;2. 是否批量 ack 音讯,3. 是否重回队列
        channel.basicNack(envelope.getDeliveryTag(),true,false);
    }
};
// 这里只须要条应答的语句,我这里常识都列出来
channel.basicConsume(QUEUE_ONE,false,consumer);
// 留神下面第二个参数要为 false 能力手动 ack

2.5、音讯 TTL 和队列 TTL、死信队列、提早队列

这一块临时不应用原始 RabbitMq Client API 实现,前面再钻研,然而会应用上面的 org.springframework.amqp 来实现

三、应用 org.springframework.amqp 操作 mq

3.1、前言:

SpringRabbitMp 进行了形象,将交换机,队列,音讯,绑定,连贯等形象出实体类,不便操作,还提供了RabbitAdmit 和RabbitTemplate 来不便交换机队列的治理以及音讯的发送接管等

3.2、根本实例

0、环境和依赖

<!-- 环境 
* jdk 1.8 
* idea
* springboot 2.2.6
  -->
<!-- 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1、实例

发送音讯通知消费者超时打折了快来购物

  • 配置类
@Configuration
public class RabbitConfig {private final static Logger log = LoggerFactory.getLogger(RabbitConfig.class);

    private final static String EXCHANGE_NAME = "verification_code_exchange";
    private final static String VERIFICATION_CODE_QUEUE = "verification_code_queue";
    private final static String VERIFICATION_CODE_ROUTE_KEY = "verification_code_key";

    // 死信交换机和队列和 key
    private final static String DLX_EXCHANGE_NAME = "dlx-exchange";
    private final static String DLX_KEY = "verification_code_key";

    @Bean
    public CachingConnectionFactory connectionFactory(){CachingConnectionFactory conn = new CachingConnectionFactory();
        conn.setUsername("leyou");
        conn.setPassword("leyou");
        conn.setVirtualHost("/leyou");
        // 音讯发送到 mq 发送确认音讯给生产者
        conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 音讯发送到 mq,通过绑定的 key 找不到 queue,则发送音讯给生产者
        conn.setPublisherReturns(true);
        return conn;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置音讯序列化
        rabbitTemplate.setMessageConverter(converter());
        // 音讯的确认回调
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean b, String s) {
//
//            }
//        });
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            //ack 为确认音讯是否胜利发送到 mq
            if(ack){
                // 胜利发送
                log.info("音讯发送胜利");
            }
        });
        // 改标记位设置位 true 时,当交换机依据本身类型和 routeKey 无奈找到对应的队列时,// 则 mq 会将音讯返还给生产者
        // 当为 false 时则 mq 会将音讯间接删除
        rabbitTemplate.setMandatory(true);
        // 音讯,返回码,返回内容,交换机,路由 key
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{
            // 音讯
            log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey);
        });
        return rabbitTemplate;
    }

    /**
     * 注入 rabbitadmin 用来申明交换机和队列,次要作用是代替原始的应用 channl 申明的做法,全副交给这个对象来实现
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmit(CachingConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    /**
     * 音讯序化对象
     * 默认应用的是 JDK 的序列化,这里配置了后就能够将音讯序列化为 json 格局
     */
    @Bean
    public MessageConverter converter() {return new Jackson2JsonMessageConverter();
    }

    /**
     * 申明一个交换机
     */
    @Bean
    public DirectExchange verificationCodeExchange(RabbitAdmin rabbitAdmin){DirectExchange exchange = new DirectExchange(EXCHANGE_NAME);
        rabbitAdmin.declareExchange(exchange);
        return exchange;
    }

    /**
     * 申明一个队列
     * @param rabbitAdmin
     * @return
     */
    @Bean
    public Queue getQueue(RabbitAdmin rabbitAdmin){Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,null);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }

    /**
     * 申明一个绑定
     * @param rabbitAdmin
     * @param verificationCodeExchange
     * @return
     */
    @Bean
    public Binding bindingQueue(RabbitAdmin rabbitAdmin,DirectExchange verificationCodeExchange){Binding with = BindingBuilder.bind(getQueue(rabbitAdmin)).to(verificationCodeExchange).with(VERIFICATION_CODE_ROUTE_KEY);
        rabbitAdmin.declareBinding(with);
        return with;
    }
}

阐明:下面用到了生产者 confirm 和音讯回调机制
1、生产者 confirm 要害代码:

//1、创立连贯时
conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//2、创立 rabbitTemplate 时
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    //ack 为确认音讯是否胜利发送到 mq
    if(ack){
        // 胜利发送
        log.info("音讯发送胜利");
    }
});

2、音讯回调机制要害代码:

//1、创立连贯时
conn.setPublisherReturns(true);
//2、创立 rabbitTemplate 时
// 改标记位设置位 true 时,当交换机依据本身类型和 routeKey 无奈找到对应的队列时,// 则 mq 会将音讯返还给生产者
// 当为 false 时则 mq 会将音讯间接删除
rabbitTemplate.setMandatory(true);     
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{
            // 音讯
log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey);
});

生产者:

@Component
public class RabbitSender {
    // 注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(){
        // 构建音讯
        Message message = MessageBuilder.withBody(JSONObject.toJSONString(MessageModel.builder().id(msgId).context("超市打折,快来抢购!").build()).getBytes()).build();
        // 音讯长久化
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        // 音讯的媒体类型
        message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
        // 音讯的自定义关联 id
        CorrelationData correlationData = new CorrelationData(String.valueOf(msgId));
        rabbitTemplate.convertAndSend(exchange,routingKey,message,new MessagePostProcessor(){
            // 音讯后置处理器,能够在上面这个办法中对音讯进行相干属性的设置
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 比方能够设置下面 这些属性等
                //message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 音讯长久化问题                                                              //message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);// 音讯的媒体类型
                return message;
            }
        },correlationData);
    }

}

消费者

@Component
public class RabbitReceive {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE_NAME, type = ExchangeTypes.DIRECT),
            key = VERIFICATION_CODE_ROUTE_KEY,
            value = @Queue(value = VERIFICATION_CODE_QUEUE, autoDelete = "false"),
            ignoreDeclarationExceptions = "true"),
            concurrency = "1", // 指定监听该队列的消费者个数
            ackMode = "MANUAL"// 手动 ack
    )
    public void receiveCode(Channel channel, Message msg, @Headers Map<String, Object> headers) throws IOException, InterruptedException {String msgId = (String) headers.get("spring_listener_return_correlation");
        long tag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(tag, false);
    }
}

其中:发送方确认(生产者 confirm)、音讯回调下面代码都蕴含了;消费者 ack 则和原始办法是一样的

上面介绍音讯 TTL,队列 TTL,死信队列,提早队列

  • 音讯和队列的 TTL
// 音讯 ttl
// 在构建音讯时设置音讯的过期工夫
Message message = MessageBuilder.withBody(JSONObject.toJSONString(MessageModel.builder().id(msgId).context("超市打折,快来抢购!").build()).getBytes()).build();
// 音讯的过期工夫
message.getMessageProperties().setExpiration("5000");
// 队列的 ttl
// 在创立队列时通过参数设置
Map<String, Object> args = new HashMap<>();
// 指定死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
// 指定死信队列的 key
args.put("x-dead-letter-routing-key", DLX_KEY);
// 设置队列中音讯的过期工夫 ms
args.put("x-message-ttl",10000);
// 整个队列的过期工夫,过期后整个队列会被删除
//args.put("x-expires",10000);
Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,args);

下面还包含死信队列的属性设置,和死信队列 key,对于死信队列的配置,还须要配置一个死信交换机和一个死信队列;当有音讯或队列的 ttl 过期,音讯超过队列最大长度,音讯被回绝且设置不从新回队列,则音讯会被转发到死信交换机,再转发到死信队列。

  • 对于提早队列的实现办法有两种
  1. 应用死信队列,用一个设置了 ttl 的队列来寄存音讯,该队列不须要消费者监听,而后给该队列配置死信交换机和队列,消费者监听死信队列,这样就能达到工夫达到提早收到音讯的目标
  2. 应用 rabbitmq 插件的形式实现,这里先不写,放到下一篇笔记中

最初

感激你看到这里,看完有什么的不懂的能够在评论区问我,感觉文章对你有帮忙的话记得给我点个赞,每天都会分享 java 相干技术文章或行业资讯,欢送大家关注和转发文章!

正文完
 0