前景回顾
【mq】从零开始实现 mq-01-生产者、消费者启动
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-04-启动检测与实现优化
【mq】从零开始实现 mq-05-实现优雅停机
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载平衡 load balance
【mq】从零开始实现 mq-08-配置优化 fluent
【mq】从零开始实现 mq-09-消费者拉取音讯 pull message
【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack
【mq】从零开始实现 mq-11-消费者音讯回执增加分组信息 pull message ack groupName
状态回执
上一节咱们实现了音讯的回执,然而存在一个问题。
同一个音讯,能够被不同的 groupName 进行生产,所以回执是须要依据 groupName 进行离开的,这个上一节中脱漏了。
Broker 推送音讯的调整
以前推送音讯是间接推送,然而短少 groupName 信息。
订阅列表获取
获取订阅列表的实现调整如下:
public List<ChannelGroupNameDto> getPushSubscribeList(MqMessage mqMessage) { final String topicName = mqMessage.getTopic(); Set<ConsumerSubscribeBo> set = pushSubscribeMap.get(topicName); if(CollectionUtil.isEmpty(set)) { return Collections.emptyList(); } //2. 获取匹配的 tag 列表 final List<String> tagNameList = mqMessage.getTags(); Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>(); for(ConsumerSubscribeBo bo : set) { String tagRegex = bo.getTagRegex(); if(RegexUtil.hasMatch(tagNameList, tagRegex)) { String groupName = bo.getGroupName(); MapUtil.putToListMap(groupMap, groupName, bo); } } //3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择 final String shardingKey = mqMessage.getShardingKey(); List<ChannelGroupNameDto> channelGroupNameList = new ArrayList<>(); for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) { List<ConsumerSubscribeBo> list = entry.getValue(); ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey); final String channelId = bo.getChannelId(); BrokerServiceEntryChannel entryChannel = registerMap.get(channelId); if(entryChannel == null) { log.warn("channelId: {} 对应的通道信息为空", channelId); continue; } final String groupName = entry.getKey(); ChannelGroupNameDto channelGroupNameDto = ChannelGroupNameDto.of(groupName, entryChannel.getChannel()); channelGroupNameList.add(channelGroupNameDto); } return channelGroupNameList;}
ChannelGroupNameDto 的定义如下:
public class ChannelGroupNameDto { /** * 分组名称 */ private String consumerGroupName; /** * 通道 */ private Channel channel; //get & set}
音讯被动推送
咱们调整一下音讯推送,每次推送实现,依据 groupName 进行状态的更新:
for(final ChannelGroupNameDto channelGroupNameDto : channelList) { final Channel channel = channelGroupNameDto.getChannel(); final String consumerGroupName =channelGroupNameDto.getConsumerGroupName(); try { // 更新状态为生产解决中 mqBrokerPersist.updateStatus(messageId, consumerGroupName, MessageStatusConst.TO_CONSUMER_PROCESS); String channelId = ChannelUtil.getChannelId(channel); log.info("开始解决 channelId: {}", channelId); //1. 调用 mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH); // 重试推送 MqConsumerResultResp resultResp = Retryer.<MqConsumerResultResp>newInstance() .maxAttempt(pushMaxAttempt) .callable(new Callable<MqConsumerResultResp>() { @Override public MqConsumerResultResp call() throws Exception { MqConsumerResultResp resp = callServer(channel, mqMessage, MqConsumerResultResp.class, invokeService, responseTime); // 失败校验 if(resp == null || !ConsumerStatus.SUCCESS.getCode() .equals(resp.getConsumerStatus())) { throw new MqException(BrokerRespCode.MSG_PUSH_FAILED); } return resp; } }).retryCall(); //2. 更新状态 //2.1 解决胜利,取 push 生产状态 if(MqCommonRespCode.SUCCESS.getCode().equals(resultResp.getRespCode())) { mqBrokerPersist.updateStatus(messageId, consumerGroupName, resultResp.getConsumerStatus()); } else { // 2.2 解决失败 log.error("生产失败:{}", JSON.toJSON(resultResp)); mqBrokerPersist.updateStatus(messageId, consumerGroupName, MessageStatusConst.TO_CONSUMER_FAILED); } log.info("实现解决 channelId: {}", channelId); } catch (Exception exception) { log.error("解决异样"); mqBrokerPersist.updateStatus(messageId, consumerGroupName, MessageStatusConst.TO_CONSUMER_FAILED); }}
音讯消费者状态回执
ps: 这里 V0.1.1 分支漏写了,不过前面 v0.1.2 分支修改了。
public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) { final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq(); req.setMessageId(messageId); req.setMessageStatus(consumerStatus.getCode()); final String traceId = IdHelper.uuid32(); req.setTraceId(traceId); req.setMethodType(MethodType.C_CONSUMER_STATUS); // 增加 groupName req.setConsumerGroupName(groupName); // 重试 return Retryer.<MqCommonResp>newInstance() .maxAttempt(consumerStatusMaxAttempt) .callable(new Callable<MqCommonResp>() { @Override public MqCommonResp call() throws Exception { Channel channel = getChannel(null); MqCommonResp resp = callServer(channel, req, MqCommonResp.class); if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED); } return resp; } }).retryCall();}
音讯状态回执时, req.setConsumerGroupName(groupName);
增加 groupName 信息。
小结
音讯状态的回执准确到 groupName 之后,不同的 groupName 生产就能够互相独立,适用性更强更广。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc-从零开始实现 rpc https://github.com/houbb/rpc