1.MQ发送信息的时候产生的问题
2.MQ的公布确认原理
3.MQ的公布确认策略
1.MQ发送信息的时候产生的问题
咱们在前一篇博客零碎学习音讯队列——RabbitMQ的音讯应答和长久化中学过,当消费者挂掉的时候,有音讯重发,当队列挂掉的时候,有音讯长久化,然而咱们却无奈保障生产者发送到队列的音讯是否确定发送胜利,这个时候就有了音讯的公布确认。
2.MQ的公布确认原理
当咱们的信道被设置成公布确认(confirm)模式,那么所有在该信道下面公布的音讯都会被指派一个惟一的ID,一旦音讯胜利投递,broker就会发送一个确认给生产者,生产者此时就晓得音讯曾经投递胜利,生产者就会把这条音讯进行删除。
confirm模式能够是同步的,也能够是异步的,同步的状况下是发送之后马上进行确认,异步的话生产者能够无需期待确认只管发送音讯,如果某些音讯失去确认,生产者将就能够通过回调办法来确认音讯。
3.MQ的公布确认策略
3.1)开启确认公布
公布确认模式默认是没有开启的,咱们须要调用办法将它关上。
Channel channel = connection.createChannel();
//开启公布确认
channel.confirmSelect();
3.2)单个确认公布
这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。
长处:简略易懂。
毛病:公布速度过慢,如果后面的音讯没有失去确认,前面的音讯就不得发送,容易阻塞。
public class ProducerSingle {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1.队列名称
* 2.队列外面的音讯是否长久化 默认音讯存储在内存中
* 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5.其余参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//开始工夫
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("音讯发送胜利");
}
}
//发送完结工夫
long end = System.currentTimeMillis();
System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms");
}
}
}
3.3)批量确认公布
单个确认公布的速度十分慢,其实咱们能够先发送一批,而后确认一批,再公布一批。
长处:比单个确认公布速度快,吞吐量大。
毛病:当其中一个音讯出问题的时候,不晓得是哪个音讯呈现了问题,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是阻塞的,一样阻塞音讯的公布。
public class ProducerMulti {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1.队列名称
* 2.队列外面的音讯是否长久化 默认音讯存储在内存中
* 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5.其余参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//批量确认音讯大小
int batchSize = 100;
//未确认音讯个数
int unConfirmMessageNum = 0;
//开始发送工夫
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//发送一条音讯,未确认书+1
unConfirmMessageNum++;
//如果位确认数达到批量确认大小
if (unConfirmMessageNum == batchSize) {
//期待进行批量确认
channel.waitForConfirms();
unConfirmMessageNum = 0;
}
}
//为了确保还有残余没有确认音讯 再次确认
if (unConfirmMessageNum > 0) {
channel.waitForConfirms();
}
//完结工夫
long end = System.currentTimeMillis();
System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms");
}
}
}
3.4)异步确认公布
异步确认不须要阻塞,生产者只管发送信息就好,队列通过回调函数告诉生产者发送胜利。
长处:保障了效率和可靠性
毛病:编程逻辑简单
public class ProducerAsyn {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1.队列名称
* 2.队列外面的音讯是否长久化 默认音讯存储在内存中
* 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5.其余参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 用于回调函数确认公布的哈希表,线程平安,实用于并发状况
* 1.能够将序列号和音讯进行关联
* 2.能够批量删除曾经确认的音讯
* 3.反对并发拜访
*/
ConcurrentSkipListMap<Long, String> confirmsMap = new ConcurrentSkipListMap<>();
/**
* 确认收到音讯的一个回调
* 1.音讯序列号
* 2.true 能够确认小于等于以后序列号的音讯
* false 确认以后序列号音讯
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//把小于以后序列号的全副音讯取出
//返回的是小于等于以后序列号的未确认音讯 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
confirmsMap.headMap(sequenceNumber, true);
//革除该局部未确认音讯
confirmed.clear();
}else{
//只革除以后序列号的音讯
confirmsMap.remove(sequenceNumber);
}
};
//未被确认的回调
ConfirmCallback nackCallback = (sequenceNumber, multiple) ->
{String message = confirmsMap.get(sequenceNumber);
System.out.println("公布的音讯"+message+"未被确认,序列号"+sequenceNumber);
};
/**
* 增加一个异步确认的监听器
* 1.确认收到音讯的回调
* 2.未收到音讯的回调
*/
channel.addConfirmListener(ackCallback, nackCallback);
//发送开始工夫
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
//在map外面设置音讯id和内容
confirmsMap.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//发送完结工夫
long end = System.currentTimeMillis();
System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms");
}
}
}
3.5)确认公布速度比照
发表回复