共计 3255 个字符,预计需要花费 9 分钟才能阅读完成。
前景回顾
【mq】从零开始实现 mq-01- 生产者、消费者启动
【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?
【mq】从零开始实现 mq-03- 引入 broker 中间人
【mq】从零开始实现 mq-04- 启动检测与实现优化
【mq】从零开始实现 mq-05- 实现优雅停机
【mq】从零开始实现 mq-06- 消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07- 负载平衡 load balance
【mq】从零开始实现 mq-08- 配置优化 fluent
【mq】从零开始实现 mq-09- 消费者拉取音讯 pull message
【mq】从零开始实现 mq-10- 消费者拉取音讯回执 pull message ack
状态回执
大家好,我是老马。
上一节咱们只实现了拉取音讯的实现,然而短少了生产状态回执。
这一节咱们一起来学习下如何实现状态回执。
代码实现
回执状态的设计
咱们规定如下几种回执状态:
package com.github.houbb.mq.common.constant; | |
/** | |
* @author binbin.hou | |
* @since 0.0.3 | |
*/ | |
public final class MessageStatusConst {private MessageStatusConst(){} | |
/** | |
* 待生产 | |
* ps: 生产者推送到 broker 的初始化状态 | |
*/ | |
public static final String WAIT_CONSUMER = "W"; | |
/** | |
* 推送给生产端解决中 | |
* ps: broker 筹备推送时,首先将状态更新为 P,期待推送后果 | |
* @since 0.1.0 | |
*/ | |
public static final String TO_CONSUMER_PROCESS = "TCP"; | |
/** | |
* 推送给生产端胜利 | |
* @since 0.1.0 | |
*/ | |
public static final String TO_CONSUMER_SUCCESS = "TCS"; | |
/** | |
* 推送给生产端失败 | |
* @since 0.1.0 | |
*/ | |
public static final String TO_CONSUMER_FAILED = "TCF"; | |
/** | |
* 生产实现 | |
*/ | |
public static final String CONSUMER_SUCCESS = "CS"; | |
/** | |
* 生产失败 | |
*/ | |
public static final String CONSUMER_FAILED = "CF"; | |
/** | |
* 稍后生产 | |
* @since 0.1.0 | |
*/ | |
public static final String CONSUMER_LATER = "CL"; | |
} |
消费者状态回执
咱们在生产之后,增加状态回执:
for(MqMessage mqMessage : mqMessageList) {IMqConsumerListenerContext context = new MqConsumerListenerContext(); | |
final String messageId = mqMessage.getTraceId(); | |
ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context); | |
log.info("音讯:{} 生产后果 {}", messageId, consumerStatus); | |
// 状态同步更新 | |
MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus); | |
log.info("音讯:{} 状态回执后果 {}", messageId, JSON.toJSON(ackResp)); | |
} |
回执实现,依据 messageId 更新对应的音讯生产状态。
public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq(); | |
req.setMessageId(messageId); | |
req.setMessageStatus(consumerStatus.getCode()); | |
final String traceId = IdHelper.uuid32(); | |
req.setTraceId(traceId); | |
req.setMethodType(MethodType.C_CONSUMER_STATUS); | |
// 重试 | |
return Retryer.<MqCommonResp>newInstance() | |
.maxAttempt(consumerStatusMaxAttempt) | |
.callable(new Callable<MqCommonResp>() { | |
@Override | |
public MqCommonResp call() throws Exception {Channel channel = getChannel(null); | |
MqCommonResp resp = callServer(channel, req, MqCommonResp.class); | |
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED); | |
} | |
return resp; | |
} | |
}).retryCall();} |
Broker 回执解决
音讯散发
// 消费者生产状态 ACK | |
if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class); | |
final String messageId = req.getMessageId(); | |
final String messageStatus = req.getMessageStatus(); | |
return mqBrokerPersist.updateStatus(messageId, messageStatus); | |
} |
简略实现
这里是基于本地 map 更新状态的,性能比拟差。
后续会以 mysql 实现。
public MqCommonResp updateStatus(String messageId, String status) { | |
// 这里性能比拟差,所以不能够用于生产。仅作为测试验证 | |
for(List<MqMessagePersistPut> list : map.values()) {for(MqMessagePersistPut put : list) {MqMessage mqMessage = put.getMqMessage(); | |
if(mqMessage.getTraceId().equals(messageId)) {put.setMessageStatus(status); | |
break; | |
} | |
} | |
} | |
MqCommonResp commonResp = new MqCommonResp(); | |
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); | |
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); | |
return commonResp; | |
} |
小结
对于音讯状态的细化,更加便于咱们后续的治理,和问题的定位。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc- 从零开始实现 rpc https://github.com/houbb/rpc
正文完