前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载平衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

【mq】从零开始实现 mq-09-消费者拉取音讯 pull message

【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack

【mq】从零开始实现 mq-11-消费者音讯回执增加分组信息 pull message ack groupName

【mq】从零开始实现 mq-12-音讯的批量发送与回执

批量音讯

对于音讯的发送,有时候可能须要一次发送多个,比方日志音讯等。

批量操作能够晋升性能。

本节老马就和大家一起增加一点批量个性。

音讯的批量发送

生产者实现

接口定义

/** * 同步发送音讯-批量 * @param mqMessageList 音讯类型 * @return 后果 * @since 0.1.3 */SendBatchResult sendBatch(final List<MqMessage> mqMessageList);/** * 单向发送音讯-批量 * @param mqMessageList 音讯类型 * @return 后果 * @since 0.1.3 */SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList);

一次反对发送多个音讯。

接口实现

生产者实现如下。

@Overridepublic SendBatchResult sendBatch(List<MqMessage> mqMessageList) {    final List<String> messageIdList = this.fillMessageList(mqMessageList);    final MqMessageBatchReq batchReq = new MqMessageBatchReq();    batchReq.setMqMessageList(mqMessageList);    String traceId = IdHelper.uuid32();    batchReq.setTraceId(traceId);    batchReq.setMethodType(MethodType.P_SEND_MSG_BATCH);    return Retryer.<SendBatchResult>newInstance()            .maxAttempt(maxAttempt)            .callable(new Callable<SendBatchResult>() {                @Override                public SendBatchResult call() throws Exception {                    return doSendBatch(messageIdList, batchReq, false);                }            }).retryCall();}@Overridepublic SendBatchResult sendOneWayBatch(List<MqMessage> mqMessageList) {    List<String> messageIdList = this.fillMessageList(mqMessageList);    MqMessageBatchReq batchReq = new MqMessageBatchReq();    batchReq.setMqMessageList(mqMessageList);    String traceId = IdHelper.uuid32();    batchReq.setTraceId(traceId);    batchReq.setMethodType(MethodType.P_SEND_MSG_ONE_WAY_BATCH);    return doSendBatch(messageIdList, batchReq, true);}private SendBatchResult doSendBatch(List<String> messageIdList,                               MqMessageBatchReq batchReq,                               boolean oneWay) {    log.info("[Producer] 批量发送音讯 messageIdList: {}, batchReq: {}, oneWay: {}",            messageIdList, JSON.toJSON(batchReq), oneWay);    // 以第一个 sharding-key 为准。    // 后续的会被疏忽    MqMessage mqMessage = batchReq.getMqMessageList().get(0);    Channel channel = getChannel(mqMessage.getShardingKey());    //one-way    if(oneWay) {        log.warn("[Producer] ONE-WAY send, ignore result");        return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);    }    MqCommonResp resp = callServer(channel, batchReq, MqCommonResp.class);    if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {        return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);    }    throw new MqException(ProducerRespCode.MSG_SEND_FAILED);}

ps: 这里和单个发送有一个区别,那就是对于 channel 的抉择。因为只能抉择一个,所以不能兼顾每一个音讯的 sharding-key。

Broker 的解决

音讯散发

// 生产者音讯发送-批量if(MethodType.P_SEND_MSG_BATCH.equals(methodType)) {    return handleProducerSendMsgBatch(channelId, json);}// 生产者音讯发送-ONE WAY-批量if(MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) {    handleProducerSendMsgBatch(channelId, json);    return null;}

具体实现

/** * 解决生产者发送的音讯 * * @param channelId 通道标识 * @param json 音讯体 * @since 0.1.3 */private MqCommonResp handleProducerSendMsgBatch(String channelId, String json) {    MqMessageBatchReq batchReq = JSON.parseObject(json, MqMessageBatchReq.class);    final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);    List<MqMessagePersistPut> putList = buildPersistPutList(batchReq, serviceEntry);    MqCommonResp commonResp = mqBrokerPersist.putBatch(putList);    // 遍历异步推送    for(MqMessagePersistPut persistPut : putList) {        this.asyncHandleMessage(persistPut);    }    return commonResp;}

这里对音讯列表进行长久化保留。

演示的长久化策略如下:

@Overridepublic MqCommonResp putBatch(List<MqMessagePersistPut> putList) {    // 构建列表    for(MqMessagePersistPut put : putList) {        this.doPut(put);    }    MqCommonResp commonResp = new MqCommonResp();    commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());    commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());    return commonResp;}

音讯的批量ACK

阐明

以前的实现形式是每一个音讯生产实现之后,进行一次 ACK。

对于 pull 策略的音讯生产,咱们能够等以后批次完结,对立进行 ACK 回执。

生产实现

实现调整如下:

for(MqTopicTagDto tagDto : subscribeList) {    final String topicName = tagDto.getTopicName();    final String tagRegex = tagDto.getTagRegex();    MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);    if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {        List<MqMessage> mqMessageList = resp.getList();        if(CollectionUtil.isNotEmpty(mqMessageList)) {            List<MqConsumerUpdateStatusDto> statusDtoList = new ArrayList<>(mqMessageList.size());            for(MqMessage mqMessage : mqMessageList) {                IMqConsumerListenerContext context = new MqConsumerListenerContext();                final String messageId = mqMessage.getTraceId();                ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);                log.info("音讯:{} 生产后果 {}", messageId, consumerStatus);                // 状态同步更新                if(!ackBatchFlag) {                    MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);                    log.info("音讯:{} 状态回执后果 {}", messageId, JSON.toJSON(ackResp));                } else {                    // 批量                    MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto();                    statusDto.setMessageId(messageId);                    statusDto.setMessageStatus(consumerStatus.getCode());                    statusDto.setConsumerGroupName(groupName);                    statusDtoList.add(statusDto);                }            }            // 批量执行            if(ackBatchFlag) {                MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList);                log.info("音讯:{} 状态批量回执后果 {}", statusDtoList, JSON.toJSON(ackResp));                statusDtoList = null;            }        }    } else {        log.error("拉取音讯失败: {}", JSON.toJSON(resp));    }}

如果 ackBatchFlag = false,则解决逻辑和以前一样。

如果 ackBatchFlag = true,则首先把音讯放到 list 中,完结后对立执行。

broker 实现

音讯散发

//消费者生产状态 ACK-批量if(MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) {    MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class);    final List<MqConsumerUpdateStatusDto> statusDtoList = req.getStatusList();    return mqBrokerPersist.updateStatusBatch(statusDtoList);}

实现

默认的长久化实现,更新如下:

@Overridepublic MqCommonResp updateStatusBatch(List<MqConsumerUpdateStatusDto> statusDtoList) {    for(MqConsumerUpdateStatusDto statusDto : statusDtoList) {        this.doUpdateStatus(statusDto.getMessageId(), statusDto.getConsumerGroupName(),                statusDto.getMessageStatus());    }    MqCommonResp commonResp = new MqCommonResp();    commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());    commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());    return commonResp;}

遍历每一个元素,进行状态的更新。

小结

异步和批量,是晋升性能最罕用的 2 种形式。

批量的实现相干来说是最简略,也是成果最显著的。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc https://github.com/houbb/rpc