1.MQ发送信息的时候产生的问题
2.MQ的公布确认原理
3.MQ的公布确认策略
1.MQ发送信息的时候产生的问题
咱们在前一篇博客零碎学习音讯队列——RabbitMQ的音讯应答和长久化中学过,当消费者挂掉的时候,有音讯重发,当队列挂掉的时候,有音讯长久化,然而咱们却无奈保障生产者发送到队列的音讯是否确定发送胜利,这个时候就有了音讯的公布确认。
2.MQ的公布确认原理
当咱们的信道被设置成公布确认(confirm)模式,那么所有在该信道下面公布的音讯都会被指派一个惟一的ID,一旦音讯胜利投递,broker就会发送一个确认给生产者,生产者此时就晓得音讯曾经投递胜利,生产者就会把这条音讯进行删除。
confirm模式能够是同步的,也能够是异步的,同步的状况下是发送之后马上进行确认,异步的话生产者能够无需期待确认只管发送音讯,如果某些音讯失去确认,生产者将就能够通过回调办法来确认音讯。
3.MQ的公布确认策略
3.1)开启确认公布
公布确认模式默认是没有开启的,咱们须要调用办法将它关上。
Channel channel = connection.createChannel(); //开启公布确认 channel.confirmSelect();
3.2)单个确认公布
这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。
长处:简略易懂。
毛病:公布速度过慢,如果后面的音讯没有失去确认,前面的音讯就不得发送,容易阻塞。
public class ProducerSingle { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //开始工夫 long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = i + ""; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("音讯发送胜利"); } } //发送完结工夫 long end = System.currentTimeMillis(); System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms"); } }}
3.3)批量确认公布
单个确认公布的速度十分慢,其实咱们能够先发送一批,而后确认一批,再公布一批。
长处:比单个确认公布速度快,吞吐量大。
毛病:当其中一个音讯出问题的时候,不晓得是哪个音讯呈现了问题,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是阻塞的,一样阻塞音讯的公布。
public class ProducerMulti { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //批量确认音讯大小 int batchSize = 100; //未确认音讯个数 int unConfirmMessageNum = 0; //开始发送工夫 long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = i + ""; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //发送一条音讯,未确认书+1 unConfirmMessageNum++; //如果位确认数达到批量确认大小 if (unConfirmMessageNum == batchSize) { //期待进行批量确认 channel.waitForConfirms(); unConfirmMessageNum = 0; } } //为了确保还有残余没有确认音讯 再次确认 if (unConfirmMessageNum > 0) { channel.waitForConfirms(); } //完结工夫 long end = System.currentTimeMillis(); System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms"); } }}
3.4)异步确认公布
异步确认不须要阻塞,生产者只管发送信息就好,队列通过回调函数告诉生产者发送胜利。
长处:保障了效率和可靠性
毛病:编程逻辑简单
public class ProducerAsyn { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); /** * 用于回调函数确认公布的哈希表,线程平安,实用于并发状况 * 1.能够将序列号和音讯进行关联 * 2.能够批量删除曾经确认的音讯 * 3.反对并发拜访 */ ConcurrentSkipListMap<Long, String> confirmsMap = new ConcurrentSkipListMap<>(); /** * 确认收到音讯的一个回调 * 1.音讯序列号 * 2.true 能够确认小于等于以后序列号的音讯 * false 确认以后序列号音讯 */ ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { //把小于以后序列号的全副音讯取出 //返回的是小于等于以后序列号的未确认音讯 是一个 map ConcurrentNavigableMap<Long, String> confirmed = confirmsMap.headMap(sequenceNumber, true); //革除该局部未确认音讯 confirmed.clear(); }else{ //只革除以后序列号的音讯 confirmsMap.remove(sequenceNumber); } }; //未被确认的回调 ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {String message = confirmsMap.get(sequenceNumber); System.out.println("公布的音讯"+message+"未被确认,序列号"+sequenceNumber); }; /** * 增加一个异步确认的监听器 * 1.确认收到音讯的回调 * 2.未收到音讯的回调 */ channel.addConfirmListener(ackCallback, nackCallback); //发送开始工夫 long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = i + ""; //在map外面设置音讯id和内容 confirmsMap.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } //发送完结工夫 long end = System.currentTimeMillis(); System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms"); } }}
3.5)确认公布速度比照