乐趣区

关于消息队列:mq从零开始实现-mq10消费者拉取消息回执-pull-message-ack

前景回顾

【mq】从零开始实现 mq-01- 生产者、消费者启动

【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?

【mq】从零开始实现 mq-03- 引入 broker 中间人

【mq】从零开始实现 mq-04- 启动检测与实现优化

【mq】从零开始实现 mq-05- 实现优雅停机

【mq】从零开始实现 mq-06- 消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07- 负载平衡 load balance

【mq】从零开始实现 mq-08- 配置优化 fluent

【mq】从零开始实现 mq-09- 消费者拉取音讯 pull message

【mq】从零开始实现 mq-10- 消费者拉取音讯回执 pull message ack

状态回执

大家好,我是老马。

上一节咱们只实现了拉取音讯的实现,然而短少了生产状态回执。

这一节咱们一起来学习下如何实现状态回执。

代码实现

回执状态的设计

咱们规定如下几种回执状态:

package com.github.houbb.mq.common.constant;

/**
 * @author binbin.hou
 * @since 0.0.3
 */
public final class MessageStatusConst {private MessageStatusConst(){}

    /**
     * 待生产
     * ps: 生产者推送到 broker 的初始化状态
     */
    public static final String WAIT_CONSUMER = "W";

    /**
     * 推送给生产端解决中
     * ps: broker 筹备推送时,首先将状态更新为 P,期待推送后果
     * @since 0.1.0
     */
    public static final String TO_CONSUMER_PROCESS = "TCP";

    /**
     * 推送给生产端胜利
     * @since 0.1.0
     */
    public static final String TO_CONSUMER_SUCCESS = "TCS";

    /**
     * 推送给生产端失败
     * @since 0.1.0
     */
    public static final String TO_CONSUMER_FAILED = "TCF";

    /**
     * 生产实现
     */
    public static final String CONSUMER_SUCCESS = "CS";

    /**
     * 生产失败
     */
    public static final String CONSUMER_FAILED = "CF";

    /**
     * 稍后生产
     * @since 0.1.0
     */
    public static final String CONSUMER_LATER = "CL";

}

消费者状态回执

咱们在生产之后,增加状态回执:

for(MqMessage mqMessage : mqMessageList) {IMqConsumerListenerContext context = new MqConsumerListenerContext();
    final String messageId = mqMessage.getTraceId();
    ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
    log.info("音讯:{} 生产后果 {}", messageId, consumerStatus);

    // 状态同步更新
    MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
    log.info("音讯:{} 状态回执后果 {}", messageId, JSON.toJSON(ackResp));
}

回执实现,依据 messageId 更新对应的音讯生产状态。

public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
    req.setMessageId(messageId);
    req.setMessageStatus(consumerStatus.getCode());
    final String traceId = IdHelper.uuid32();
    req.setTraceId(traceId);
    req.setMethodType(MethodType.C_CONSUMER_STATUS);

    // 重试
    return Retryer.<MqCommonResp>newInstance()
            .maxAttempt(consumerStatusMaxAttempt)
            .callable(new Callable<MqCommonResp>() {
                @Override
                public MqCommonResp call() throws Exception {Channel channel = getChannel(null);
                    MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                    if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
                    }
                    return resp;
                }
            }).retryCall();}

Broker 回执解决

音讯散发

// 消费者生产状态 ACK
if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
    final String messageId = req.getMessageId();
    final String messageStatus = req.getMessageStatus();
    return mqBrokerPersist.updateStatus(messageId, messageStatus);
}

简略实现

这里是基于本地 map 更新状态的,性能比拟差。

后续会以 mysql 实现。

public MqCommonResp updateStatus(String messageId, String status) {
    // 这里性能比拟差,所以不能够用于生产。仅作为测试验证
    for(List<MqMessagePersistPut> list : map.values()) {for(MqMessagePersistPut put : list) {MqMessage mqMessage = put.getMqMessage();
            if(mqMessage.getTraceId().equals(messageId)) {put.setMessageStatus(status);
                break;
            }
        }
    }

    MqCommonResp commonResp = new MqCommonResp();
    commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
    commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
    return commonResp;
}

小结

对于音讯状态的细化,更加便于咱们后续的治理,和问题的定位。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc- 从零开始实现 rpc https://github.com/houbb/rpc

退出移动版