前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

为什么须要心跳?

心跳(heartbeat ),顾名思义就是心脏的跳动。

医学上个别通过心跳是否跳动,来判断一个人是否活着。

那么,分布式服务中如何判断一个服务是否还活着呢?

实现思路

比方 mq 中,broker 须要把音讯实时推送给在线的消费者。

那么如何判断一个消费者是否活着呢?

咱们能够让消费者定时,比方每 5 秒钟给 broker 发送一个心跳包,思考到网络提早等,如果间断 1min 都没有收到心跳,咱们则移除这个消费者,认为服务曾经挂了。

消费者实现

上代码!

心跳实现

心跳能够是一个很简略的音讯体。

@Overridepublic void heartbeat() {    final MqHeartBeatReq req = new MqHeartBeatReq();    final String traceId = IdHelper.uuid32();    req.setTraceId(traceId);    req.setMethodType(MethodType.C_HEARTBEAT);    req.setAddress(NetUtil.getLocalHost());    req.setPort(0);    req.setTime(System.currentTimeMillis());    log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req));    // 告诉全副    for(RpcChannelFuture channelFuture : channelFutureList) {        try {            Channel channel = channelFuture.getChannelFuture().channel();            callServer(channel, req, null);        } catch (Exception exception) {            log.error("[HEARTBEAT] 往服务端解决异样", exception);        }    }}

消费者把心跳告诉所有的 broker.

心跳的定时执行

咱们启动一个定时工作,5S 钟执行一次。

/** * 初始化心跳 * @since 0.0.6 */private void initHeartbeat() {    //5S 发一次心跳    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            heartbeat();        }    }, 5, 5, TimeUnit.SECONDS);}

心跳是在连贯到 broker 之后就开始启动:

@Overridepublic void initChannelFutureList(ConsumerBrokerConfig config) {    //1. 配置初始化    //...    //2. 初始化    this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,            initChannelHandler(), check);    //3. 初始化心跳    this.initHeartbeat();}

Broker 实现

消费者定时发送音讯,生产者必定是须要承受的。

接管心跳

为了简略,咱们让心跳是 ONE-WAY 的。

// 消费者心跳if(MethodType.C_HEARTBEAT.equals(methodType)) {    MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);    registerConsumerService.heartbeat(req, channel);    return null;}

hearbeat 解决

每次收到音讯,咱们把申请的 channelId 记录下来,并保留最新的拜访工夫

@Overridepublic void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {    final String channelId = ChannelUtil.getChannelId(channel);    log.info("[HEARTBEAT] 接管消费者心跳 {}, channelId: {}",            JSON.toJSON(mqHeartBeatReq), channelId);    ServiceEntry serviceEntry = new ServiceEntry();    serviceEntry.setAddress(mqHeartBeatReq.getAddress());    serviceEntry.setPort(mqHeartBeatReq.getPort());    BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);    entryChannel.setLastAccessTime(mqHeartBeatReq.getTime());    heartbeatMap.put(channelId, entryChannel);}

移除消费者

如果一些消费者长时间没有心跳,咱们就认为服务曾经挂了。

LocalBrokerConsumerService 服务启动的时候,同时启用一个定时清理工作。

public LocalBrokerConsumerService() {    //120S 扫描一次    final long limitMills = 2 * 60 * 1000;    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            for(Map.Entry<String, BrokerServiceEntryChannel> entry : heartbeatMap.entrySet()) {                String key  = entry.getKey();                long lastAccessTime = entry.getValue().getLastAccessTime();                long currentTime = System.currentTimeMillis();                if(currentTime - lastAccessTime > limitMills) {                    removeByChannelId(key);                }            }        }    }, 2 * 60, 2 * 60, TimeUnit.SECONDS);}

这个工作 2min 执行一次,如果 2min 都没有心跳,这移除对应的消费者。

小结

心跳,是网络传输中验证服务可用性非常简单,然而无效的形式。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc https://github.com/houbb/rpc