1.MQ 发送信息的时候产生的问题
2.MQ 的公布确认原理
3.MQ 的公布确认策略
1.MQ 发送信息的时候产生的问题
咱们在前一篇博客零碎学习音讯队列——RabbitMQ 的音讯应答和长久化中学过,当 消费者挂掉 的时候,有音讯重发 ,当 队列挂掉 的时候,有音讯长久化 ,然而咱们却 无奈保障生产者发送到队列的音讯是否确定发送胜利 ,这个时候就有了 音讯的公布确认。
2.MQ 的公布确认原理
当咱们的信道被设置成 公布确认(confirm)模式 ,那么所有在该信道下面公布的 音讯都会被指派一个惟一的 ID,一旦 音讯胜利投递 ,broker 就 会发送一个确认给生产者 ,生产者此时就晓得音讯曾经投递胜利, 生产者就会把这条音讯进行删除。
confirm 模式能够是 同步的 ,也能够是 异步的 , 同步 的状况下是 发送之后马上进行确认 , 异步 的话生产者能够 无需期待确认只管发送音讯 ,如果某些音讯失去确认,生产者将就能够 通过回调办法来确认音讯。
3.MQ 的公布确认策略
3.1)开启确认公布
公布确认模式默认是没有开启的,咱们须要调用办法将它关上。
Channel channel = connection.createChannel();
// 开启公布确认
channel.confirmSelect();
3.2)单个确认公布
这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。
长处:简略易懂。
毛病:公布速度过慢 ,如果后面的音讯没有失去确认,前面的音讯就不得发送, 容易阻塞。
public class ProducerSingle {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1. 队列名称
* 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
* 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5. 其余参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 开始工夫
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发
boolean flag = channel.waitForConfirms();
if (flag) {System.out.println("音讯发送胜利");
}
}
// 发送完结工夫
long end = System.currentTimeMillis();
System.out.println("公布" + 1000 + "个独自确认音讯, 耗时" + (end - begin) + "ms");
}
}
}
3.3)批量确认公布
单个确认公布的速度十分慢,其实咱们能够先 发送一批,而后确认一批 ,再公布一批。
长处:比单个确认公布速度快,吞吐量大 。
毛病:当 其中一个音讯出问题 的时候,不晓得是哪个音讯呈现了问题 ,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是 阻塞 的,一样阻塞音讯的公布。
public class ProducerMulti {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1. 队列名称
* 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
* 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5. 其余参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 批量确认音讯大小
int batchSize = 100;
// 未确认音讯个数
int unConfirmMessageNum = 0;
// 开始发送工夫
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 发送一条音讯,未确认书 +1
unConfirmMessageNum++;
// 如果位确认数达到批量确认大小
if (unConfirmMessageNum == batchSize) {
// 期待进行批量确认
channel.waitForConfirms();
unConfirmMessageNum = 0;
}
}
// 为了确保还有残余没有确认音讯 再次确认
if (unConfirmMessageNum > 0) {channel.waitForConfirms();
}
// 完结工夫
long end = System.currentTimeMillis();
System.out.println("公布" + 1000 + "个独自确认音讯, 耗时" + (end - begin) + "ms");
}
}
}
3.4)异步确认公布
异步确认 不须要阻塞,生产者只管发送信息就好 ,队列 通过回调函数告诉生产者发送胜利。
长处:保障了效率和可靠性
毛病:编程逻辑简单
public class ProducerAsyn {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1. 队列名称
* 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
* 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5. 其余参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 用于回调函数确认公布的哈希表,线程平安,实用于并发状况
* 1. 能够将序列号和音讯进行关联
* 2. 能够批量删除曾经确认的音讯
* 3. 反对并发拜访
*/
ConcurrentSkipListMap<Long, String> confirmsMap = new ConcurrentSkipListMap<>();
/**
* 确认收到音讯的一个回调
* 1. 音讯序列号
* 2.true 能够确认小于等于以后序列号的音讯
* false 确认以后序列号音讯
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {
// 把小于以后序列号的全副音讯取出
// 返回的是小于等于以后序列号的未确认音讯 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
confirmsMap.headMap(sequenceNumber, true);
// 革除该局部未确认音讯
confirmed.clear();}else{
// 只革除以后序列号的音讯
confirmsMap.remove(sequenceNumber);
}
};
// 未被确认的回调
ConfirmCallback nackCallback = (sequenceNumber, multiple) ->
{String message = confirmsMap.get(sequenceNumber);
System.out.println("公布的音讯"+message+"未被确认,序列号"+sequenceNumber);
};
/**
* 增加一个异步确认的监听器
* 1. 确认收到音讯的回调
* 2. 未收到音讯的回调
*/
channel.addConfirmListener(ackCallback, nackCallback);
// 发送开始工夫
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
// 在 map 外面设置音讯 id 和内容
confirmsMap.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
// 发送完结工夫
long end = System.currentTimeMillis();
System.out.println("公布" + 1000 + "个独自确认音讯, 耗时" + (end - begin) + "ms");
}
}
}
3.5)确认公布速度比照