当一个服务领有太多解决逻辑时,会导致代码构造异样的凌乱,很难分辨一段逻辑是在哪个阶段发挥作用的。这时就能够引入状态机模型,帮忙代码构造变得清晰。
一、状态机库概述一)简介状态机由一组状态组成:【初始状态 -> 中间状态 -> 最终状态】。在一个状态机中,每个状态会接管一组特定的事件,依据事件类型进行解决,并转换到下一个状态。当转换到最终状态时则退出。
二)状态转换形式状态间转换会有上面这三种类型:
三)Yarn 状态机类在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮忙定义状态机。如何应用,咱们间接写个 demo。<img src="https://cdn.nlark.com/yuque/0/2022/png/21670600/1665737645204-cf813e7c-20a1-43bf-ac2a-5b65f05ceb7e.png" alt="image.png" style="zoom:67%;" />
二、案例 demo在上一篇文章《Yarn 服务库和事件库》案例根底上进行扩大,减少状态机库的内容。如果还不理解服务库和事件库的同学,倡议先学习下上一篇文章。案例已上传至 github,有帮忙能够点个 ⭐️https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo
一)状态机实现状态机实现,能够间接嵌入到上篇文章中的 AsyncDispatcher应用。这里仅给出状态机JobStateMachine以及各种事件处理的代码。残缺的代码我的项目执行,请到 github demo 中查看。
import com.shuofxz.event.JobEvent;import com.shuofxz.event.JobEventType;import org.apache.hadoop.yarn.event.EventHandler;import org.apache.hadoop.yarn.state.*;import java.util.EnumSet;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** 可参考 Yarn 中实现的状态机对象:* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,* NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等* */@SuppressWarnings({"rawtypes", "unchecked"})public class JobStateMachine implements EventHandler<JobEvent> { private final String jobID; private EventHandler eventHandler; private final Lock writeLock; private final Lock readLock; // 定义状态机 protected static final StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent> stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW) .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition()) .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition()) .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition()) .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition()) .installTopology(); private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine; public JobStateMachine(String jobID, EventHandler eventHandler) { this.jobID = jobID; // 多线程异步解决,state 有可能被同时读写,应用读写锁来防止竞争 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.eventHandler = eventHandler; stateMachine = stateMachineFactory.make(this); } protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { return stateMachine; } public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> { @Override public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); // do something... // 实现后发送新的 Event —— JOB_START jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START)); } } public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> { @Override public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED)); } } public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> { @Override public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED)); } } public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> { @Override public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) { System.out.println("Receiving event " + jobEvent); // 这是多后果状态局部,因而须要人为制订后续状态 // 这里整个流程完结,设置一下对应的状态 boolean flag = true; if (flag) { return JobStateInternal.SUCCEEDED; } else { return JobStateInternal.KILLED; } } } @Override public void handle(JobEvent jobEvent) { try { // 留神这里为了防止动态条件,应用了读写锁 writeLock.lock(); JobStateInternal oldState = getInternalState(); try { getStateMachine().doTransition(jobEvent.getType(), jobEvent); } catch (InvalidStateTransitionException e) { System.out.println("Can't handle this event at current state!"); } if (oldState != getInternalState()) { System.out.println("Job Transitioned from " + oldState + " to " + getInternalState()); } } finally { writeLock.unlock(); } } public JobStateInternal getInternalState() { readLock.lock(); try { return getStateMachine().getCurrentState(); } finally { readLock.unlock(); } } public enum JobStateInternal { NEW, SETUP, INITED, RUNNING, SUCCEEDED, KILLED }}二)状态机可视化hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java,能够拷贝到咱们的工程中应用。依据提醒,运行须要三个参数:
...