1.MQ发送信息的时候产生的问题

2.MQ的公布确认原理

3.MQ的公布确认策略

1.MQ发送信息的时候产生的问题

咱们在前一篇博客零碎学习音讯队列——RabbitMQ的音讯应答和长久化中学过,当消费者挂掉的时候,有音讯重发,当队列挂掉的时候,有音讯长久化,然而咱们却无奈保障生产者发送到队列的音讯是否确定发送胜利,这个时候就有了音讯的公布确认

2.MQ的公布确认原理
当咱们的信道被设置成公布确认(confirm)模式,那么所有在该信道下面公布的音讯都会被指派一个惟一的ID,一旦音讯胜利投递,broker就会发送一个确认给生产者,生产者此时就晓得音讯曾经投递胜利,生产者就会把这条音讯进行删除。

confirm模式能够是同步的,也能够是异步的同步的状况下是发送之后马上进行确认异步的话生产者能够无需期待确认只管发送音讯,如果某些音讯失去确认,生产者将就能够通过回调办法来确认音讯

3.MQ的公布确认策略

3.1)开启确认公布
公布确认模式默认是没有开启的,咱们须要调用办法将它关上。

        Channel channel = connection.createChannel();        //开启公布确认        channel.confirmSelect();

3.2)单个确认公布

这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。

长处:简略易懂。
毛病:公布速度过慢,如果后面的音讯没有失去确认,前面的音讯就不得发送,容易阻塞

public class ProducerSingle {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个队列             * 1.队列名称             * 2.队列外面的音讯是否长久化 默认音讯存储在内存中             * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产             * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除             * 5.其余参数             */            channel.queueDeclare(QUEUE_NAME, true, false, false, null);            //开始工夫            long begin = System.currentTimeMillis();            for (int i = 0; i < 1000; i++) {                String message = i + "";                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());                //确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发                boolean flag = channel.waitForConfirms();                if (flag) {                    System.out.println("音讯发送胜利");                }            }            //发送完结工夫            long end = System.currentTimeMillis();            System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms");        }    }}

3.3)批量确认公布
单个确认公布的速度十分慢,其实咱们能够先发送一批,而后确认一批,再公布一批。
长处:比单个确认公布速度快,吞吐量大
毛病:当其中一个音讯出问题的时候,不晓得是哪个音讯呈现了问题,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是阻塞的,一样阻塞音讯的公布。

public class ProducerMulti {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个队列             * 1.队列名称             * 2.队列外面的音讯是否长久化 默认音讯存储在内存中             * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产             * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除             * 5.其余参数             */            channel.queueDeclare(QUEUE_NAME, true, false, false, null);            //批量确认音讯大小            int batchSize = 100;            //未确认音讯个数            int unConfirmMessageNum = 0;            //开始发送工夫            long begin = System.currentTimeMillis();            for (int i = 0; i < 1000; i++) {                String message = i + "";                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());                //发送一条音讯,未确认书+1                unConfirmMessageNum++;                //如果位确认数达到批量确认大小                if (unConfirmMessageNum == batchSize) {                    //期待进行批量确认                    channel.waitForConfirms();                    unConfirmMessageNum = 0;                }            }            //为了确保还有残余没有确认音讯 再次确认            if (unConfirmMessageNum > 0) {                channel.waitForConfirms();            }            //完结工夫            long end = System.currentTimeMillis();            System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms");        }    }}

3.4)异步确认公布
异步确认不须要阻塞,生产者只管发送信息就好,队列通过回调函数告诉生产者发送胜利。

长处:保障了效率和可靠性

毛病:编程逻辑简单

public class ProducerAsyn {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个队列             * 1.队列名称             * 2.队列外面的音讯是否长久化 默认音讯存储在内存中             * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产             * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除             * 5.其余参数             */            channel.queueDeclare(QUEUE_NAME, true, false, false, null);            /**             * 用于回调函数确认公布的哈希表,线程平安,实用于并发状况             * 1.能够将序列号和音讯进行关联             * 2.能够批量删除曾经确认的音讯             * 3.反对并发拜访             */            ConcurrentSkipListMap<Long, String> confirmsMap = new ConcurrentSkipListMap<>();            /**             * 确认收到音讯的一个回调             * 1.音讯序列号             * 2.true 能够确认小于等于以后序列号的音讯             *   false 确认以后序列号音讯             */            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {                if (multiple) {                    //把小于以后序列号的全副音讯取出                    //返回的是小于等于以后序列号的未确认音讯 是一个 map                    ConcurrentNavigableMap<Long, String> confirmed =                            confirmsMap.headMap(sequenceNumber, true);                    //革除该局部未确认音讯                    confirmed.clear();                }else{                    //只革除以后序列号的音讯                    confirmsMap.remove(sequenceNumber);                }            };            //未被确认的回调            ConfirmCallback nackCallback = (sequenceNumber, multiple) ->            {String message = confirmsMap.get(sequenceNumber);                System.out.println("公布的音讯"+message+"未被确认,序列号"+sequenceNumber);            };            /**             * 增加一个异步确认的监听器             * 1.确认收到音讯的回调             * 2.未收到音讯的回调             */            channel.addConfirmListener(ackCallback, nackCallback);            //发送开始工夫            long begin = System.currentTimeMillis();            for (int i = 0; i < 1000; i++) {                String message = i + "";                //在map外面设置音讯id和内容                confirmsMap.put(channel.getNextPublishSeqNo(), message);                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());            }            //发送完结工夫            long end = System.currentTimeMillis();            System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms");        }    }}

3.5)确认公布速度比照