当一个服务领有太多解决逻辑时,会导致代码构造异样的凌乱,很难分辨一段逻辑是在哪个阶段发挥作用的。
这时就能够引入状态机模型,帮忙代码构造变得清晰。
一、状态机库概述
一)简介
状态机由一组状态组成:
【初始状态 -> 中间状态 -> 最终状态】。
在一个状态机中,每个状态会接管一组特定的事件,依据事件类型进行解决,并转换到下一个状态。当转换到最终状态时则退出。
二)状态转换形式
状态间转换会有上面这三种类型:
三)Yarn 状态机类
在 Yarn 中提供了一个工厂类 StateMachineFactory
来帮忙定义状态机。如何应用,咱们间接写个 demo。
<img src=”https://cdn.nlark.com/yuque/0/2022/png/21670600/1665737645204-cf813e7c-20a1-43bf-ac2a-5b65f05ceb7e.png” alt=”image.png” style=”zoom:67%;” />
二、案例 demo
在上一篇文章《Yarn 服务库和事件库》案例根底上进行扩大,减少状态机库的内容。如果还不理解服务库和事件库的同学,倡议先学习下上一篇文章。
案例已上传至 github,有帮忙能够点个 ⭐️
https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo
一)状态机实现
状态机实现,能够间接嵌入到上篇文章中的 AsyncDispatcher
应用。
这里仅给出状态机 JobStateMachine
以及各种事件处理的代码。残缺的代码我的项目执行,请到 github demo 中查看。
import com.shuofxz.event.JobEvent;
import com.shuofxz.event.JobEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/*
* 可参考 Yarn 中实现的状态机对象:* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,* NodeManager 中 的 ApplicationImpl、ContainerImpl 和 LocalizedResource,* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等
* */
@SuppressWarnings({"rawtypes", "unchecked"})
public class JobStateMachine implements EventHandler<JobEvent> {
private final String jobID;
private EventHandler eventHandler;
private final Lock writeLock;
private final Lock readLock;
// 定义状态机
protected static final StateMachineFactory<JobStateMachine, JobStateInternal,
JobEventType, JobEvent>
stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())
.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())
.addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())
.installTopology();
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
public JobStateMachine(String jobID, EventHandler eventHandler) {
this.jobID = jobID;
// 多线程异步解决,state 有可能被同时读写,应用读写锁来防止竞争
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.eventHandler = eventHandler;
stateMachine = stateMachineFactory.make(this);
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {return stateMachine;}
public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
@Override
public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event" + jobEvent);
// do something...
// 实现后发送新的 Event —— JOB_START
jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));
}
}
public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
@Override
public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event" + jobEvent);
jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));
}
}
public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
@Override
public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event" + jobEvent);
jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));
}
}
public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> {
@Override
public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event" + jobEvent);
// 这是多后果状态局部,因而须要人为制订后续状态
// 这里整个流程完结,设置一下对应的状态
boolean flag = true;
if (flag) {return JobStateInternal.SUCCEEDED;} else {return JobStateInternal.KILLED;}
}
}
@Override
public void handle(JobEvent jobEvent) {
try {
// 留神这里为了防止动态条件,应用了读写锁
writeLock.lock();
JobStateInternal oldState = getInternalState();
try {getStateMachine().doTransition(jobEvent.getType(), jobEvent);
} catch (InvalidStateTransitionException e) {System.out.println("Can't handle this event at current state!");
}
if (oldState != getInternalState()) {System.out.println("Job Transitioned from" + oldState + "to" + getInternalState());
}
} finally {writeLock.unlock();
}
}
public JobStateInternal getInternalState() {readLock.lock();
try {return getStateMachine().getCurrentState();} finally {readLock.unlock();
}
}
public enum JobStateInternal {
NEW,
SETUP,
INITED,
RUNNING,
SUCCEEDED,
KILLED
}
}
二)状态机可视化
hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java
,能够拷贝到咱们的工程中应用。
依据提醒,运行须要三个参数:
Usage: %s <GraphName> <class[,class[,...]]> <OutputFile>%n
运行后会在我的项目根目录生成图文件 jsm.gv
。
须要应用 graphviz
工具将 gv 文件转换成 png 文件:
# linux 装置
yum install graphviz
# mac 装置
brew install graphviz
转换:
dot -Tpng jsm.gv > jsm.png
可视化状态机展现:
<img src=”https://cdn.nlark.com/yuque/0/2022/png/21670600/1665736293215-5272544b-b42c-461f-b51e-54cb4a8fb227.png” alt=”image.png” style=”zoom: 25%;” />
再应用这个工具对 Yarn 中的 Application 状态进行展现:
<img src=”https://cdn.nlark.com/yuque/0/2022/png/21670600/1665736370375-47dca89b-11b3-4a89-8610-4d425b2bd90b.png” alt=”image.png” style=”zoom:33%;” />
三)如果不必状态机库
【思考】
如果不必状态机,代码构造会是什么样呢?
上面这样的代码,如果要减少或批改逻辑可能就是很苦楚的一件事件了。
// 一堆的函数调用
// 一堆的 if 嵌套
// 或者 switch case
三、总结
本节对 Yarn 状态机库进行了介绍。理论应用时会联合事件库、服务库一起应用。
状态机库的应用帮忙代码构造更加的清晰,新增状态解决逻辑只须要减少一个状态类别,或者减少一个办法解决对应类型的事件即可。将整个解决逻辑进行了拆分,便于编写和保护。
参考文章:
源码 |Yarn 的事件驱动模型与状态机