前景回顾
【mq】从零开始实现 mq-01-生产者、消费者启动
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-04-启动检测与实现优化
【mq】从零开始实现 mq-05-实现优雅停机
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载平衡 load balance
【mq】从零开始实现 mq-08-配置优化 fluent
【mq】从零开始实现 mq-09-消费者拉取音讯 pull message
【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack
【mq】从零开始实现 mq-11-消费者音讯回执增加分组信息 pull message ack groupName
【mq】从零开始实现 mq-12-音讯的批量发送与回执
批量音讯
对于音讯的发送,有时候可能须要一次发送多个,比方日志音讯等。
批量操作能够晋升性能。
本节老马就和大家一起增加一点批量个性。
音讯的批量发送
生产者实现
接口定义
/** * 同步发送音讯-批量 * @param mqMessageList 音讯类型 * @return 后果 * @since 0.1.3 */SendBatchResult sendBatch(final List<MqMessage> mqMessageList);/** * 单向发送音讯-批量 * @param mqMessageList 音讯类型 * @return 后果 * @since 0.1.3 */SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList);
一次反对发送多个音讯。
接口实现
生产者实现如下。
@Overridepublic SendBatchResult sendBatch(List<MqMessage> mqMessageList) { final List<String> messageIdList = this.fillMessageList(mqMessageList); final MqMessageBatchReq batchReq = new MqMessageBatchReq(); batchReq.setMqMessageList(mqMessageList); String traceId = IdHelper.uuid32(); batchReq.setTraceId(traceId); batchReq.setMethodType(MethodType.P_SEND_MSG_BATCH); return Retryer.<SendBatchResult>newInstance() .maxAttempt(maxAttempt) .callable(new Callable<SendBatchResult>() { @Override public SendBatchResult call() throws Exception { return doSendBatch(messageIdList, batchReq, false); } }).retryCall();}@Overridepublic SendBatchResult sendOneWayBatch(List<MqMessage> mqMessageList) { List<String> messageIdList = this.fillMessageList(mqMessageList); MqMessageBatchReq batchReq = new MqMessageBatchReq(); batchReq.setMqMessageList(mqMessageList); String traceId = IdHelper.uuid32(); batchReq.setTraceId(traceId); batchReq.setMethodType(MethodType.P_SEND_MSG_ONE_WAY_BATCH); return doSendBatch(messageIdList, batchReq, true);}private SendBatchResult doSendBatch(List<String> messageIdList, MqMessageBatchReq batchReq, boolean oneWay) { log.info("[Producer] 批量发送音讯 messageIdList: {}, batchReq: {}, oneWay: {}", messageIdList, JSON.toJSON(batchReq), oneWay); // 以第一个 sharding-key 为准。 // 后续的会被疏忽 MqMessage mqMessage = batchReq.getMqMessageList().get(0); Channel channel = getChannel(mqMessage.getShardingKey()); //one-way if(oneWay) { log.warn("[Producer] ONE-WAY send, ignore result"); return SendBatchResult.of(messageIdList, SendStatus.SUCCESS); } MqCommonResp resp = callServer(channel, batchReq, MqCommonResp.class); if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { return SendBatchResult.of(messageIdList, SendStatus.SUCCESS); } throw new MqException(ProducerRespCode.MSG_SEND_FAILED);}
ps: 这里和单个发送有一个区别,那就是对于 channel 的抉择。因为只能抉择一个,所以不能兼顾每一个音讯的 sharding-key。
Broker 的解决
音讯散发
// 生产者音讯发送-批量if(MethodType.P_SEND_MSG_BATCH.equals(methodType)) { return handleProducerSendMsgBatch(channelId, json);}// 生产者音讯发送-ONE WAY-批量if(MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) { handleProducerSendMsgBatch(channelId, json); return null;}
具体实现
/** * 解决生产者发送的音讯 * * @param channelId 通道标识 * @param json 音讯体 * @since 0.1.3 */private MqCommonResp handleProducerSendMsgBatch(String channelId, String json) { MqMessageBatchReq batchReq = JSON.parseObject(json, MqMessageBatchReq.class); final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId); List<MqMessagePersistPut> putList = buildPersistPutList(batchReq, serviceEntry); MqCommonResp commonResp = mqBrokerPersist.putBatch(putList); // 遍历异步推送 for(MqMessagePersistPut persistPut : putList) { this.asyncHandleMessage(persistPut); } return commonResp;}
这里对音讯列表进行长久化保留。
演示的长久化策略如下:
@Overridepublic MqCommonResp putBatch(List<MqMessagePersistPut> putList) { // 构建列表 for(MqMessagePersistPut put : putList) { this.doPut(put); } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp;}
音讯的批量ACK
阐明
以前的实现形式是每一个音讯生产实现之后,进行一次 ACK。
对于 pull 策略的音讯生产,咱们能够等以后批次完结,对立进行 ACK 回执。
生产实现
实现调整如下:
for(MqTopicTagDto tagDto : subscribeList) { final String topicName = tagDto.getTopicName(); final String tagRegex = tagDto.getTagRegex(); MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size); if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { List<MqMessage> mqMessageList = resp.getList(); if(CollectionUtil.isNotEmpty(mqMessageList)) { List<MqConsumerUpdateStatusDto> statusDtoList = new ArrayList<>(mqMessageList.size()); for(MqMessage mqMessage : mqMessageList) { IMqConsumerListenerContext context = new MqConsumerListenerContext(); final String messageId = mqMessage.getTraceId(); ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context); log.info("音讯:{} 生产后果 {}", messageId, consumerStatus); // 状态同步更新 if(!ackBatchFlag) { MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus); log.info("音讯:{} 状态回执后果 {}", messageId, JSON.toJSON(ackResp)); } else { // 批量 MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto(); statusDto.setMessageId(messageId); statusDto.setMessageStatus(consumerStatus.getCode()); statusDto.setConsumerGroupName(groupName); statusDtoList.add(statusDto); } } // 批量执行 if(ackBatchFlag) { MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList); log.info("音讯:{} 状态批量回执后果 {}", statusDtoList, JSON.toJSON(ackResp)); statusDtoList = null; } } } else { log.error("拉取音讯失败: {}", JSON.toJSON(resp)); }}
如果 ackBatchFlag = false,则解决逻辑和以前一样。
如果 ackBatchFlag = true,则首先把音讯放到 list 中,完结后对立执行。
broker 实现
音讯散发
//消费者生产状态 ACK-批量if(MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) { MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class); final List<MqConsumerUpdateStatusDto> statusDtoList = req.getStatusList(); return mqBrokerPersist.updateStatusBatch(statusDtoList);}
实现
默认的长久化实现,更新如下:
@Overridepublic MqCommonResp updateStatusBatch(List<MqConsumerUpdateStatusDto> statusDtoList) { for(MqConsumerUpdateStatusDto statusDto : statusDtoList) { this.doUpdateStatus(statusDto.getMessageId(), statusDto.getConsumerGroupName(), statusDto.getMessageStatus()); } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp;}
遍历每一个元素,进行状态的更新。
小结
异步和批量,是晋升性能最罕用的 2 种形式。
批量的实现相干来说是最简略,也是成果最显著的。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc-从零开始实现 rpc https://github.com/houbb/rpc