关于消息队列:mq从零开始实现-mq12消息的批量发送与回执

38次阅读

共计 6080 个字符,预计需要花费 16 分钟才能阅读完成。

前景回顾

【mq】从零开始实现 mq-01- 生产者、消费者启动

【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?

【mq】从零开始实现 mq-03- 引入 broker 中间人

【mq】从零开始实现 mq-04- 启动检测与实现优化

【mq】从零开始实现 mq-05- 实现优雅停机

【mq】从零开始实现 mq-06- 消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07- 负载平衡 load balance

【mq】从零开始实现 mq-08- 配置优化 fluent

【mq】从零开始实现 mq-09- 消费者拉取音讯 pull message

【mq】从零开始实现 mq-10- 消费者拉取音讯回执 pull message ack

【mq】从零开始实现 mq-11- 消费者音讯回执增加分组信息 pull message ack groupName

【mq】从零开始实现 mq-12- 音讯的批量发送与回执

批量音讯

对于音讯的发送,有时候可能须要一次发送多个,比方日志音讯等。

批量操作能够晋升性能。

本节老马就和大家一起增加一点批量个性。

音讯的批量发送

生产者实现

接口定义

/**
 * 同步发送音讯 - 批量
 * @param mqMessageList 音讯类型
 * @return 后果
 * @since 0.1.3
 */
SendBatchResult sendBatch(final List<MqMessage> mqMessageList);

/**
 * 单向发送音讯 - 批量
 * @param mqMessageList 音讯类型
 * @return 后果
 * @since 0.1.3
 */
SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList);

一次反对发送多个音讯。

接口实现

生产者实现如下。

@Override
public SendBatchResult sendBatch(List<MqMessage> mqMessageList) {final List<String> messageIdList = this.fillMessageList(mqMessageList);
    final MqMessageBatchReq batchReq = new MqMessageBatchReq();
    batchReq.setMqMessageList(mqMessageList);
    String traceId = IdHelper.uuid32();
    batchReq.setTraceId(traceId);
    batchReq.setMethodType(MethodType.P_SEND_MSG_BATCH);
    return Retryer.<SendBatchResult>newInstance()
            .maxAttempt(maxAttempt)
            .callable(new Callable<SendBatchResult>() {
                @Override
                public SendBatchResult call() throws Exception {return doSendBatch(messageIdList, batchReq, false);
                }
            }).retryCall();}

@Override
public SendBatchResult sendOneWayBatch(List<MqMessage> mqMessageList) {List<String> messageIdList = this.fillMessageList(mqMessageList);
    MqMessageBatchReq batchReq = new MqMessageBatchReq();
    batchReq.setMqMessageList(mqMessageList);
    String traceId = IdHelper.uuid32();
    batchReq.setTraceId(traceId);
    batchReq.setMethodType(MethodType.P_SEND_MSG_ONE_WAY_BATCH);
    return doSendBatch(messageIdList, batchReq, true);
}


private SendBatchResult doSendBatch(List<String> messageIdList,
                               MqMessageBatchReq batchReq,
                               boolean oneWay) {log.info("[Producer] 批量发送音讯 messageIdList: {}, batchReq: {}, oneWay: {}",
            messageIdList, JSON.toJSON(batchReq), oneWay);
    // 以第一个 sharding-key 为准。// 后续的会被疏忽
    MqMessage mqMessage = batchReq.getMqMessageList().get(0);
    Channel channel = getChannel(mqMessage.getShardingKey());
    //one-way
    if(oneWay) {log.warn("[Producer] ONE-WAY send, ignore result");
        return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);
    }
    MqCommonResp resp = callServer(channel, batchReq, MqCommonResp.class);
    if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);
    }
    throw new MqException(ProducerRespCode.MSG_SEND_FAILED);
}

ps: 这里和单个发送有一个区别,那就是对于 channel 的抉择。因为只能抉择一个,所以不能兼顾每一个音讯的 sharding-key。

Broker 的解决

音讯散发

// 生产者音讯发送 - 批量
if(MethodType.P_SEND_MSG_BATCH.equals(methodType)) {return handleProducerSendMsgBatch(channelId, json);
}

// 生产者音讯发送 -ONE WAY- 批量
if(MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) {handleProducerSendMsgBatch(channelId, json);
    return null;
}

具体实现

/**
 * 解决生产者发送的音讯
 *
 * @param channelId 通道标识
 * @param json 音讯体
 * @since 0.1.3
 */
private MqCommonResp handleProducerSendMsgBatch(String channelId, String json) {MqMessageBatchReq batchReq = JSON.parseObject(json, MqMessageBatchReq.class);
    final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);
    List<MqMessagePersistPut> putList = buildPersistPutList(batchReq, serviceEntry);

    MqCommonResp commonResp = mqBrokerPersist.putBatch(putList);

    // 遍历异步推送
    for(MqMessagePersistPut persistPut : putList) {this.asyncHandleMessage(persistPut);
    }
    return commonResp;
}

这里对音讯列表进行长久化保留。

演示的长久化策略如下:

@Override
public MqCommonResp putBatch(List<MqMessagePersistPut> putList) {
    // 构建列表
    for(MqMessagePersistPut put : putList) {this.doPut(put);
    }

    MqCommonResp commonResp = new MqCommonResp();
    commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
    commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
    return commonResp;
}

音讯的批量 ACK

阐明

以前的实现形式是每一个音讯生产实现之后,进行一次 ACK。

对于 pull 策略的音讯生产,咱们能够等以后批次完结,对立进行 ACK 回执。

生产实现

实现调整如下:

for(MqTopicTagDto tagDto : subscribeList) {final String topicName = tagDto.getTopicName();
    final String tagRegex = tagDto.getTagRegex();
    MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);

    if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {List<MqMessage> mqMessageList = resp.getList();
        if(CollectionUtil.isNotEmpty(mqMessageList)) {List<MqConsumerUpdateStatusDto> statusDtoList = new ArrayList<>(mqMessageList.size());
            for(MqMessage mqMessage : mqMessageList) {IMqConsumerListenerContext context = new MqConsumerListenerContext();
                final String messageId = mqMessage.getTraceId();
                ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
                log.info("音讯:{} 生产后果 {}", messageId, consumerStatus);

                // 状态同步更新
                if(!ackBatchFlag) {MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
                    log.info("音讯:{} 状态回执后果 {}", messageId, JSON.toJSON(ackResp));
                } else {
                    // 批量
                    MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto();
                    statusDto.setMessageId(messageId);
                    statusDto.setMessageStatus(consumerStatus.getCode());
                    statusDto.setConsumerGroupName(groupName);
                    statusDtoList.add(statusDto);
                }
            }

            // 批量执行
            if(ackBatchFlag) {MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList);
                log.info("音讯:{} 状态批量回执后果 {}", statusDtoList, JSON.toJSON(ackResp));
                statusDtoList = null;
            }
        }
    } else {log.error("拉取音讯失败: {}", JSON.toJSON(resp));
    }
}

如果 ackBatchFlag = false,则解决逻辑和以前一样。

如果 ackBatchFlag = true,则首先把音讯放到 list 中,完结后对立执行。

broker 实现

音讯散发

// 消费者生产状态 ACK- 批量
if(MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) {MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class);
    final List<MqConsumerUpdateStatusDto> statusDtoList = req.getStatusList();
    return mqBrokerPersist.updateStatusBatch(statusDtoList);
}

实现

默认的长久化实现,更新如下:

@Override
public MqCommonResp updateStatusBatch(List<MqConsumerUpdateStatusDto> statusDtoList) {for(MqConsumerUpdateStatusDto statusDto : statusDtoList) {this.doUpdateStatus(statusDto.getMessageId(), statusDto.getConsumerGroupName(),
                statusDto.getMessageStatus());
    }

    MqCommonResp commonResp = new MqCommonResp();
    commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
    commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
    return commonResp;
}

遍历每一个元素,进行状态的更新。

小结

异步和批量,是晋升性能最罕用的 2 种形式。

批量的实现相干来说是最简略,也是成果最显著的。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc- 从零开始实现 rpc https://github.com/houbb/rpc

正文完
 0