乐趣区

关于mq:系统学习消息队列RabbitMQ的消息发布确认

1.MQ 发送信息的时候产生的问题

2.MQ 的公布确认原理

3.MQ 的公布确认策略

1.MQ 发送信息的时候产生的问题

咱们在前一篇博客零碎学习音讯队列——RabbitMQ 的音讯应答和长久化中学过,当 消费者挂掉 的时候,有音讯重发 ,当 队列挂掉 的时候,有音讯长久化 ,然而咱们却 无奈保障生产者发送到队列的音讯是否确定发送胜利 ,这个时候就有了 音讯的公布确认

2.MQ 的公布确认原理
当咱们的信道被设置成 公布确认(confirm)模式 ,那么所有在该信道下面公布的 音讯都会被指派一个惟一的 ID,一旦 音讯胜利投递 ,broker 就 会发送一个确认给生产者 ,生产者此时就晓得音讯曾经投递胜利, 生产者就会把这条音讯进行删除。

confirm 模式能够是 同步的 ,也能够是 异步的 同步 的状况下是 发送之后马上进行确认 异步 的话生产者能够 无需期待确认只管发送音讯 ,如果某些音讯失去确认,生产者将就能够 通过回调办法来确认音讯

3.MQ 的公布确认策略

3.1)开启确认公布
公布确认模式默认是没有开启的,咱们须要调用办法将它关上。

        Channel channel = connection.createChannel();
        // 开启公布确认
        channel.confirmSelect();

3.2)单个确认公布

这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。

长处:简略易懂。
毛病:公布速度过慢 ,如果后面的音讯没有失去确认,前面的音讯就不得发送, 容易阻塞

public class ProducerSingle {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 开始工夫
            long begin = System.currentTimeMillis();
            for (int i = 0; i < 1000; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                // 确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发
                boolean flag = channel.waitForConfirms();
                if (flag) {System.out.println("音讯发送胜利");
                }
            }
            // 发送完结工夫
            long end = System.currentTimeMillis();
            System.out.println("公布" + 1000 + "个独自确认音讯, 耗时" + (end - begin) + "ms");
        }
    }
}

3.3)批量确认公布
单个确认公布的速度十分慢,其实咱们能够先 发送一批,而后确认一批 ,再公布一批。
长处:比单个确认公布速度快,吞吐量大
毛病:当 其中一个音讯出问题 的时候,不晓得是哪个音讯呈现了问题 ,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是 阻塞 的,一样阻塞音讯的公布。

public class ProducerMulti {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // 批量确认音讯大小
            int batchSize = 100;
            // 未确认音讯个数
            int unConfirmMessageNum = 0;
            // 开始发送工夫
            long begin = System.currentTimeMillis();
            for (int i = 0; i < 1000; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                // 发送一条音讯,未确认书 +1
                unConfirmMessageNum++;
                // 如果位确认数达到批量确认大小
                if (unConfirmMessageNum == batchSize) {
                    // 期待进行批量确认
                    channel.waitForConfirms();
                    unConfirmMessageNum = 0;
                }
            }
            // 为了确保还有残余没有确认音讯 再次确认
            if (unConfirmMessageNum > 0) {channel.waitForConfirms();
            }
            // 完结工夫
            long end = System.currentTimeMillis();
            System.out.println("公布" + 1000 + "个独自确认音讯, 耗时" + (end - begin) + "ms");
        }
    }
}

3.4)异步确认公布
异步确认 不须要阻塞,生产者只管发送信息就好 ,队列 通过回调函数告诉生产者发送胜利。

长处:保障了效率和可靠性

毛病:编程逻辑简单

public class ProducerAsyn {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            /**
             * 用于回调函数确认公布的哈希表,线程平安,实用于并发状况
             * 1. 能够将序列号和音讯进行关联
             * 2. 能够批量删除曾经确认的音讯
             * 3. 反对并发拜访
             */
            ConcurrentSkipListMap<Long, String> confirmsMap = new ConcurrentSkipListMap<>();

            /**
             * 确认收到音讯的一个回调
             * 1. 音讯序列号
             * 2.true 能够确认小于等于以后序列号的音讯
             *   false 确认以后序列号音讯
             */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {
                    // 把小于以后序列号的全副音讯取出
                    // 返回的是小于等于以后序列号的未确认音讯 是一个 map
                    ConcurrentNavigableMap<Long, String> confirmed =
                            confirmsMap.headMap(sequenceNumber, true);
                    // 革除该局部未确认音讯
                    confirmed.clear();}else{
                    // 只革除以后序列号的音讯
                    confirmsMap.remove(sequenceNumber);
                }
            };
            // 未被确认的回调
            ConfirmCallback nackCallback = (sequenceNumber, multiple) ->
            {String message = confirmsMap.get(sequenceNumber);
                System.out.println("公布的音讯"+message+"未被确认,序列号"+sequenceNumber);
            };
            /**
             * 增加一个异步确认的监听器
             * 1. 确认收到音讯的回调
             * 2. 未收到音讯的回调
             */
            channel.addConfirmListener(ackCallback, nackCallback);

            // 发送开始工夫
            long begin = System.currentTimeMillis();
            for (int i = 0; i < 1000; i++) {
                String message = i + "";
                // 在 map 外面设置音讯 id 和内容
                confirmsMap.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            // 发送完结工夫
            long end = System.currentTimeMillis();
            System.out.println("公布" + 1000 + "个独自确认音讯, 耗时" + (end - begin) + "ms");
        }
    }
}

3.5)确认公布速度比照

退出移动版