共计 5242 个字符,预计需要花费 14 分钟才能阅读完成。
@[toc]
后面一篇文章松哥和大家聊了 MQ 高可用之如何确保音讯胜利发送,各种配置齐上阵,最终确保了音讯的胜利发送,甚至在一些极其状况下还可能产生同一条音讯反复发送的状况,不论怎么样,音讯总算发送进来了,如果小伙伴们还没看过上篇文章,倡议先看看,再来学习本文:
- 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?
明天咱们就来聊一聊音讯生产的问题,看看如何确保音讯生产胜利,并且确保幂等性。
1. 两种生产思路
RabbitMQ 的音讯生产,整体上来说有两种不同的思路:
- 推(push):MQ 被动将音讯 推送 给消费者,这种形式须要消费者设置一个缓冲区去缓存音讯,对于消费者而言,内存中总是有一堆须要解决的音讯,所以这种形式的效率比拟高,这也是目前大多数利用采纳的生产形式。
- 拉(pull):消费者被动从 MQ 拉取 音讯,这种形式效率并不是很高,不过有的时候如果服务端须要批量拉取音讯,倒是能够采纳这种形式。
两种形式我都举个例子看下。
先来看推(push):
这种形式大家比拟常见,就是通过 @RabbitListener
注解去标记消费者,如下:
@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle(String msg) {System.out.println("msg =" + msg);
}
}
当监听的队列中有音讯时,就会触发该办法。
再来看拉(pull):
@Test
public void test01() throws UnsupportedEncodingException {Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
System.out.println("o =" + new String(((byte[]) o),"UTF-8"));
}
调用 receiveAndConvert 办法,办法参数为队列名称,办法执行实现后,会从 MQ 上拉取一条音讯下来,如果该办法返回值为 null,示意该队列上没有音讯了。receiveAndConvert 办法有一个重载办法,能够在重载办法中传入一个期待超时工夫,例如 3 秒。此时,假如队列中没有音讯了,则 receiveAndConvert 办法会阻塞 3 秒,3 秒内如果队列中有了新音讯就返回,3 秒后如果队列中还是没有新音讯,就返回 null,这个期待超时工夫要是不设置的话,默认为 0。
这是音讯两种不同的生产模式。
如果须要从音讯队列中继续取得音讯,就能够应用推模式;如果只是单纯的生产一条音讯,则应用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅音讯,这会重大影响 RabbitMQ 的性能。
2. 确保生产胜利两种思路
在上篇文章中,咱们想尽办法确保音讯可能发送胜利,对于音讯生产胜利,其实官网提供了相干的机制,咱们一起来看下。
为了保障音讯可能牢靠的达到音讯消费者,RabbitMQ 中提供了音讯生产确认机制。当消费者去生产音讯的时候,能够通过指定 autoAck 参数来示意音讯生产的确认形式。
- 当 autoAck 为 false 的时候,此时即便消费者曾经收到音讯了,RabbitMQ 也不会立马将音讯移除,而是期待消费者显式的回复确认信号后,才会将音讯打上删除标记,而后再删除。
- 当 autoAck 为 true 的时候,此时音讯消费者就会主动把发送进来的音讯设置为确认,而后将音讯移除(从内存或者磁盘中),即便这些音讯并没有达到消费者。
咱们来看一张图:
如上图所示,在 RabbitMQ 的 web 治理页面:
- Ready 示意待生产的音讯数量。
- Unacked 示意曾经发送给消费者然而还没收到消费者 ack 的音讯数量。
这是咱们能够从 UI 层面察看音讯的生产状况确认状况。
当咱们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,生产分成了两个局部:
- 待生产的音讯
- 曾经投递给消费者,然而还没有被消费者确认的音讯
换句话说,当设置 autoAck 为 false 的时候,消费者就变得十分从容了,它将有足够的工夫去解决这条音讯,当音讯失常解决实现后,再手动 ack,此时 RabbitMQ 才会认为这条音讯生产胜利了。如果 RabbitMQ 始终没有收到客户端的反馈,并且此时客户端也曾经断开连接了,那么 RabbitMQ 就会将刚刚的音讯从新放回队列中,期待下一次被生产。
综上所述,确保音讯被胜利生产,无非就是手动 Ack 或者主动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致音讯被反复生产,所以一般来说咱们还须要在解决音讯时,解决幂等性问题。
3. 音讯回绝
当客户端收到音讯时,能够抉择生产这条音讯,也能够抉择回绝这条音讯。咱们来看下回绝的形式:
@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle(Channel channel, Message message) {
// 获取音讯编号
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 回绝音讯
channel.basicReject(deliveryTag, true);
} catch (IOException e) {e.printStackTrace();
}
}
}
消费者收到音讯之后,能够抉择回绝生产该条音讯,回绝的步骤分两步:
- 获取音讯编号 deliveryTag。
- 调用 basicReject 办法回绝音讯。
调用 basicReject 办法时,第二个参数是 requeue,即是否从新入队。如果第二个参数为 true,则这条被回绝的音讯会从新进入到音讯队列中,期待下一次被生产;如果第二个参数为 false,则这条被回绝的音讯就会被丢掉,不会有新的消费者去生产它了。
须要留神的是,basicReject 办法一次只能回绝一条音讯。
4. 音讯确认
音讯确认分为主动确认和手动确认,咱们别离来看。
4.1 主动确认
先来看看主动确认,在 Spring Boot 中,默认状况下,音讯生产就是主动确认的。
咱们来看如下一个音讯生产办法:
@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle2(String msg) {System.out.println("msg =" + msg);
int i = 1 / 0;
}
}
通过 @Componet 注解将以后类注入到 Spring 容器中,而后通过 @RabbitListener 注解来标记一个音讯生产办法,默认状况下,音讯生产办法自带事务,即如果该办法在执行过程中抛出异样,那么被生产的音讯会从新回到队列中期待下一次被生产,如果该办法失常执行完没有抛出异样,则这条音讯就算是被生产了。
4.2 手动确认
手动确认我又把它分为两种:推模式手动确认与拉模式手动确认。
4.2.1 推模式手动确认
要开启手动确认,须要咱们首先敞开主动确认,敞开形式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
这个配置示意将音讯的确认模式改为手动确认。
接下来咱们来看下消费者中的代码:
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle3(Message message,Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 音讯生产的代码写到这里
String s = new String(message.getBody());
System.out.println("s =" + s);
// 生产实现后,手动 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 手动 nack
try {channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {ex.printStackTrace();
}
}
}
将消费者要做的事件放到一个 try..catch
代码块中。
如果音讯失常生产胜利,则执行 basicAck
实现确认。
如果音讯生产失败,则执行 basicNack
办法,通知 RabbitMQ 音讯生产失败。
这里波及到两个办法:
- basicAck:这个是手动确认音讯曾经胜利生产,该办法有两个参数:第一个参数示意音讯的 id;第二个参数 multiple 如果为 false,示意仅确认以后音讯生产胜利,如果为 true,则示意以后音讯之前所有未被以后消费者确认的音讯都生产胜利。
- basicNack:这个是通知 RabbitMQ 以后音讯未被胜利生产,该办法有三个参数:第一个参数示意音讯的 id;第二个参数 multiple 如果为 false,示意仅回绝以后音讯的生产,如果为 true,则示意回绝以后音讯之前所有未被以后消费者确认的音讯;第三个参数 requeue 含意和后面所说的一样,被回绝的音讯是否从新入队。
当 basicNack 中最初一个参数设置为 false 的时候,还波及到一个死信队列的问题,这个松哥当前再专门写文章和大家细聊。
4.2.2 拉模式手动确认
拉模式手动 ack 比拟麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的办法,所以咱们得用原生的方法,如下:
public void receive2() {Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
long deliveryTag = 0L;
try {GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
System.out.println("o =" + new String((getResponse.getBody()), "UTF-8"));
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {ex.printStackTrace();
}
}
}
这里波及到的 basicAck 和 basicNack 办法跟后面的一样,我就不再赘述。
5. 幂等性问题
最初咱们再来说说音讯的幂等性问题。
大家构想上面一个场景:
消费者在生产完一条音讯后,向 RabbitMQ 发送一个 ack 确认,此时因为网络断开或者其余起因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条音讯删除,当从新建设起连贯后,消费者还是会再次收到该条音讯,这就造成了音讯的反复生产。同时,因为相似的起因,音讯在发送的时候,同一条音讯也可能会发送两次(参见四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?)。种种原因导致咱们在生产音讯时,肯定要解决好幂等性问题。
幂等性问题的解决倒也不难,基本上都是从业务上来解决,我来大略说说思路。
采纳 Redis,在消费者生产音讯之前,现将音讯的 id 放到 Redis 中,存储形式如下:
- id-0(正在执行业务)
- id-1(执行业务胜利)
如果 ack 失败,在 RabbitMQ 将音讯交给其余的消费者时,先执行 setnx,如果 key 曾经存在(阐明之前有人生产过该音讯),获取他的值,如果是 0,以后消费者就什么都不做,如果是 1,间接 ack。
极其状况:第一个消费者在执行业务时,呈现了死锁,在 setnx 的根底上,再给 key 设置一个生存工夫。生产者,发送音讯时,指定 messageId。
当然这只是一个简略思路供大家参考。
松哥在 vhr 我的项目中也解决了音讯幂等性问题,感兴趣的小伙伴能够查看 vhr 源码(https://github.com/lenve/vhr),代码在 mailserver 中。
6. 小结
好啦,明天就和小伙伴们聊了下 RabbitMQ 中和音讯生产相干的几个话题,感兴趣的小伙伴能够实际下哦~
复制文章题目并在公众号后盾回复,能够下载本文案例~