共计 3008 个字符,预计需要花费 8 分钟才能阅读完成。
前景回顾
【mq】从零开始实现 mq-01- 生产者、消费者启动
【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?
【mq】从零开始实现 mq-03- 引入 broker 中间人
【mq】从零开始实现 mq-04- 启动检测与实现优化
【mq】从零开始实现 mq-05- 实现优雅停机
【mq】从零开始实现 mq-06- 消费者心跳检测 heartbeat
为什么须要心跳?
心跳(heartbeat),顾名思义就是心脏的跳动。
医学上个别通过心跳是否跳动,来判断一个人是否活着。
那么,分布式服务中如何判断一个服务是否还活着呢?
实现思路
比方 mq 中,broker 须要把音讯实时推送给在线的消费者。
那么如何判断一个消费者是否活着呢?
咱们能够让消费者定时,比方每 5 秒钟给 broker 发送一个心跳包,思考到网络提早等,如果间断 1min 都没有收到心跳,咱们则移除这个消费者,认为服务曾经挂了。
消费者实现
上代码!
心跳实现
心跳能够是一个很简略的音讯体。
@Override
public void heartbeat() {final MqHeartBeatReq req = new MqHeartBeatReq();
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_HEARTBEAT);
req.setAddress(NetUtil.getLocalHost());
req.setPort(0);
req.setTime(System.currentTimeMillis());
log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req));
// 告诉全副
for(RpcChannelFuture channelFuture : channelFutureList) {
try {Channel channel = channelFuture.getChannelFuture().channel();
callServer(channel, req, null);
} catch (Exception exception) {log.error("[HEARTBEAT] 往服务端解决异样", exception);
}
}
}
消费者把心跳告诉所有的 broker.
心跳的定时执行
咱们启动一个定时工作,5S 钟执行一次。
/**
* 初始化心跳
* @since 0.0.6
*/
private void initHeartbeat() {
//5S 发一次心跳
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {heartbeat();
}
}, 5, 5, TimeUnit.SECONDS);
}
心跳是在连贯到 broker 之后就开始启动:
@Override
public void initChannelFutureList(ConsumerBrokerConfig config) {
//1. 配置初始化
//...
//2. 初始化
this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
initChannelHandler(), check);
//3. 初始化心跳
this.initHeartbeat();}
Broker 实现
消费者定时发送音讯,生产者必定是须要承受的。
接管心跳
为了简略,咱们让心跳是 ONE-WAY 的。
// 消费者心跳
if(MethodType.C_HEARTBEAT.equals(methodType)) {MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
registerConsumerService.heartbeat(req, channel);
return null;
}
hearbeat 解决
每次收到音讯,咱们把申请的 channelId 记录下来,并保留最新的拜访工夫
@Override
public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {final String channelId = ChannelUtil.getChannelId(channel);
log.info("[HEARTBEAT] 接管消费者心跳 {}, channelId: {}",
JSON.toJSON(mqHeartBeatReq), channelId);
ServiceEntry serviceEntry = new ServiceEntry();
serviceEntry.setAddress(mqHeartBeatReq.getAddress());
serviceEntry.setPort(mqHeartBeatReq.getPort());
BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
entryChannel.setLastAccessTime(mqHeartBeatReq.getTime());
heartbeatMap.put(channelId, entryChannel);
}
移除消费者
如果一些消费者长时间没有心跳,咱们就认为服务曾经挂了。
在 LocalBrokerConsumerService
服务启动的时候,同时启用一个定时清理工作。
public LocalBrokerConsumerService() {
//120S 扫描一次
final long limitMills = 2 * 60 * 1000;
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {for(Map.Entry<String, BrokerServiceEntryChannel> entry : heartbeatMap.entrySet()) {String key = entry.getKey();
long lastAccessTime = entry.getValue().getLastAccessTime();
long currentTime = System.currentTimeMillis();
if(currentTime - lastAccessTime > limitMills) {removeByChannelId(key);
}
}
}
}, 2 * 60, 2 * 60, TimeUnit.SECONDS);
}
这个工作 2min 执行一次,如果 2min 都没有心跳,这移除对应的消费者。
小结
心跳,是网络传输中验证服务可用性非常简单,然而无效的形式。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc- 从零开始实现 rpc https://github.com/houbb/rpc