共计 3287 个字符,预计需要花费 9 分钟才能阅读完成。
前景回顾
【mq】从零开始实现 mq-01- 生产者、消费者启动
【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?
【mq】从零开始实现 mq-03- 引入 broker 中间人
【mq】从零开始实现 mq-04- 启动检测与实现优化
【mq】从零开始实现 mq-05- 实现优雅停机
为什么须要优雅敞开?
我记得多年前,那个时候 rpc 框架支流用的还是 dubbo,每次都是中午还是上线,上线上完根本都是凌晨 2-3 点。
为什么要中午上线呢?
因为这个时候个别业务流量最低。
还有就是上线公布,每次都要人工期待一段几分钟。
因为 rpc 调用入口曾经敞开了,然而自身可能还没有解决完。
那么有没有办法能够让服务的敞开更加优雅,而不是人工期待呢?
实现思路
人工期待几分钟的形式个别能够解决问题,然而大部分状况是无用功,还比拟浪费时间。
比拟天然的一种形式是引入钩子函数。
当利用筹备敞开时,首先判断是否存在解决中的申请,不存在则间接敞开;存在,则期待申请实现再敞开。
实现
生产者和消费者是相似的,咱们以生产者为例。
启动实现的调整
@Override
public synchronized void run() {this.paramCheck();
// 启动服务端
log.info("MQ 生产者开始启动客户端 GROUP: {} brokerAddress: {}",
groupName, brokerAddress);
try {
//0. 配置信息
ProducerBrokerConfig config = ProducerBrokerConfig.newInstance()
.groupName(groupName)
.brokerAddress(brokerAddress)
.check(check)
.respTimeoutMills(respTimeoutMills)
.invokeService(invokeService)
.statusManager(statusManager);
//1. 初始化
this.producerBrokerService.initChannelFutureList(config);
//2. 连贯到服务端
this.producerBrokerService.registerToBroker();
//3. 标识为可用
statusManager.status(true);
//4. 增加钩子函数
final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
rpcShutdownHook.setStatusManager(statusManager);
rpcShutdownHook.setInvokeService(invokeService);
rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
rpcShutdownHook.setDestroyable(this.producerBrokerService);
ShutdownHooks.rpcShutdownHook(rpcShutdownHook);
log.info("MQ 生产者启动实现");
} catch (Exception e) {log.error("MQ 生产者启动遇到异样", e);
throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
}
}
状态治理类
这里咱们引入 statusManager 治理整体的状态。
默认的如下:
public class StatusManager implements IStatusManager {
private boolean status;
@Override
public boolean status() {return this.status;}
@Override
public IStatusManager status(boolean status) {
this.status = status;
return this;
}
}
就是对一个是否可用的状态进行保护,而后在 channel 获取等中央便于判断以后服务的状态。
钩子函数
DefaultShutdownHook 实现如下:
public class DefaultShutdownHook extends AbstractShutdownHook {
/**
* 调用治理类
* @since 0.0.5
*/
private IInvokeService invokeService;
/**
* 销毁治理类
* @since 0.0.5
*/
private Destroyable destroyable;
/**
* 状态治理类
* @since 0.0.5
*/
private IStatusManager statusManager;
/**
* 为残余的申请等待时间
* @since 0.0.5
*/
private long waitMillsForRemainRequest = 60 * 1000;
//get & set
/**
*(1)设置 status 状态为期待敞开
*(2)查看是否 {@link IInvokeService#remainsRequest()} 是否蕴含申请
*(3)超时检测 - 能够不增加,如果难以敞开胜利,间接强制敞开即可。*(4)敞开所有线程池资源信息
*(5)设置状态为胜利敞开
*/
@Override
protected void doHook() {statusManager.status(false);
// 设置状态为期待敞开
logger.info("[Shutdown] set status to wait for shutdown.");
// 循环期待以后执行的申请执行实现
long startMills = System.currentTimeMillis();
while (invokeService.remainsRequest()) {long currentMills = System.currentTimeMillis();
long costMills = currentMills - startMills;
if(costMills >= waitMillsForRemainRequest) {logger.warn("[Shutdown] still remains request, but timeout, break.");
break;
}
logger.debug("[Shutdown] still remains request, wait for a while.");
DateUtil.sleep(10);
}
// 销毁
destroyable.destroyAll();
// 设置状态为敞开胜利
statusManager.status(false);
logger.info("[Shutdown] set status to shutdown success.");
}
}
(1)进行敞开前,首先判断通过 invokeService.remainsRequest()
判断是否有未解决完的音讯,有则进行期待。
(2)当然,咱们还须要思考网络音讯失落的场景,不可能始终期待。
所以引入了超时中断,最大等待时间也是能够自行定义的。
if(costMills >= waitMillsForRemainRequest) {logger.warn("[Shutdown] still remains request, but timeout, break.");
break;
}
(3)敞开之后
将 status 设置为 false,标识以后服务不可用。
小结
随着 rpc 技术的成熟,优雅敞开曾经成为一个很根本的性能点。
一个小小的改变,能够节约生产公布工夫,早点上班陪陪家人。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc- 从零开始实现 rpc https://github.com/houbb/rpc