乐趣区

关于mq:mq从零开始实现-mq05实现优雅停机

前景回顾

【mq】从零开始实现 mq-01- 生产者、消费者启动

【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?

【mq】从零开始实现 mq-03- 引入 broker 中间人

【mq】从零开始实现 mq-04- 启动检测与实现优化

【mq】从零开始实现 mq-05- 实现优雅停机

为什么须要优雅敞开?

我记得多年前,那个时候 rpc 框架支流用的还是 dubbo,每次都是中午还是上线,上线上完根本都是凌晨 2-3 点。

为什么要中午上线呢?

因为这个时候个别业务流量最低。

还有就是上线公布,每次都要人工期待一段几分钟。

因为 rpc 调用入口曾经敞开了,然而自身可能还没有解决完。

那么有没有办法能够让服务的敞开更加优雅,而不是人工期待呢?

实现思路

人工期待几分钟的形式个别能够解决问题,然而大部分状况是无用功,还比拟浪费时间。

比拟天然的一种形式是引入钩子函数。

当利用筹备敞开时,首先判断是否存在解决中的申请,不存在则间接敞开;存在,则期待申请实现再敞开。

实现

生产者和消费者是相似的,咱们以生产者为例。

启动实现的调整

@Override
public synchronized void run() {this.paramCheck();
    // 启动服务端
    log.info("MQ 生产者开始启动客户端 GROUP: {} brokerAddress: {}",
            groupName, brokerAddress);
    try {
        //0. 配置信息
        ProducerBrokerConfig config = ProducerBrokerConfig.newInstance()
                .groupName(groupName)
                .brokerAddress(brokerAddress)
                .check(check)
                .respTimeoutMills(respTimeoutMills)
                .invokeService(invokeService)
                .statusManager(statusManager);

        //1. 初始化
        this.producerBrokerService.initChannelFutureList(config);
        //2. 连贯到服务端
        this.producerBrokerService.registerToBroker();

        //3. 标识为可用
        statusManager.status(true);

        //4. 增加钩子函数
        final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
        rpcShutdownHook.setStatusManager(statusManager);
        rpcShutdownHook.setInvokeService(invokeService);
        rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
        rpcShutdownHook.setDestroyable(this.producerBrokerService);
        ShutdownHooks.rpcShutdownHook(rpcShutdownHook);
        log.info("MQ 生产者启动实现");
    } catch (Exception e) {log.error("MQ 生产者启动遇到异样", e);
        throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
    }
}

状态治理类

这里咱们引入 statusManager 治理整体的状态。

默认的如下:

public class StatusManager implements IStatusManager {

    private boolean status;

    @Override
    public boolean status() {return this.status;}

    @Override
    public IStatusManager status(boolean status) {
        this.status = status;

        return this;
    }

}

就是对一个是否可用的状态进行保护,而后在 channel 获取等中央便于判断以后服务的状态。

钩子函数

DefaultShutdownHook 实现如下:

public class DefaultShutdownHook extends AbstractShutdownHook {

    /**
     * 调用治理类
     * @since 0.0.5
     */
    private IInvokeService invokeService;

    /**
     * 销毁治理类
     * @since 0.0.5
     */
    private Destroyable destroyable;

    /**
     * 状态治理类
     * @since 0.0.5
     */
    private IStatusManager statusManager;

    /**
     * 为残余的申请等待时间
     * @since 0.0.5
     */
    private long waitMillsForRemainRequest = 60 * 1000;

    //get & set

    /**
     *(1)设置 status 状态为期待敞开
     *(2)查看是否 {@link IInvokeService#remainsRequest()} 是否蕴含申请
     *(3)超时检测 - 能够不增加,如果难以敞开胜利,间接强制敞开即可。*(4)敞开所有线程池资源信息
     *(5)设置状态为胜利敞开
     */
    @Override
    protected void doHook() {statusManager.status(false);
        // 设置状态为期待敞开
        logger.info("[Shutdown] set status to wait for shutdown.");

        // 循环期待以后执行的申请执行实现
        long startMills = System.currentTimeMillis();
        while (invokeService.remainsRequest()) {long currentMills = System.currentTimeMillis();
            long costMills = currentMills - startMills;
            if(costMills >= waitMillsForRemainRequest) {logger.warn("[Shutdown] still remains request, but timeout, break.");
                break;
            }

            logger.debug("[Shutdown] still remains request, wait for a while.");
            DateUtil.sleep(10);
        }

        // 销毁
        destroyable.destroyAll();

        // 设置状态为敞开胜利
        statusManager.status(false);
        logger.info("[Shutdown] set status to shutdown success.");
    }

}

(1)进行敞开前,首先判断通过 invokeService.remainsRequest() 判断是否有未解决完的音讯,有则进行期待。

(2)当然,咱们还须要思考网络音讯失落的场景,不可能始终期待。

所以引入了超时中断,最大等待时间也是能够自行定义的。

if(costMills >= waitMillsForRemainRequest) {logger.warn("[Shutdown] still remains request, but timeout, break.");
    break;
}

(3)敞开之后

将 status 设置为 false,标识以后服务不可用。

小结

随着 rpc 技术的成熟,优雅敞开曾经成为一个很根本的性能点。

一个小小的改变,能够节约生产公布工夫,早点上班陪陪家人。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc- 从零开始实现 rpc https://github.com/houbb/rpc

退出移动版