本节开始,将对 ResourceManager 中一些常见行为进行剖析探索,看某些具体要害的行为,在 RM 中是如何流转的。本节将深刻源码探索「启动 ApplicationMaster」的具体流程。
一、整体流程
本大节介绍从应用程序提交到启动 ApplicationMaster 的整个过程,期间波及 Client、RMService、 RMAppManager、RMApplmpl、RMAppAttemptImpl、RMNode、ResourceScheduler 等几个次要组件。当客户端调用 RPC 函数 ApplicationClientProtocol#submitApplication
后, ResourceManager 端的处理过程如下图所示。
二、具体流程剖析
接下来追随下面的流程图,咱们深刻源码具体分析每一步都是如何执行的:
最开始由客户端发动工作提交 submitApplication()
,通过 ClientRMService
和 RMAppManager
发送 RMAppEventType.START
事件,之后交由 RMAppImpl
解决。
protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user, false); Credentials credentials = null; try { credentials = parseCredentials(submissionContext); if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer() .addApplicationAsync(applicationId, credentials, submissionContext.getCancelTokensWhenComplete(), application.getUser()); } else { // Dispatcher is not yet started at this time, so these START events // enqueued should be guaranteed to be first processed when dispatcher // gets started. // 这里发送 RMAppEventType.START 事件 this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START)); }
RMAppImpl
这货色是个状态机,收到事件之后会本人转换状态并且解决相应的逻辑。
(状态机还不相熟的同学,可翻到我后面的文章进行学习《2-4 Yarn 根底库 - 状态机库》)
截取一部分状态转换代码:
private static final StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory = new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW) // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) // 收到 RMAppEventType.START 事件 .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, RMAppState.FINAL_SAVING), RMAppEventType.RECOVER, new RMAppRecoveredTransition()) .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, new AppKilledTransition()) .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED))
一)RMAppImpl - START
收到 RMAppEventType.START
事件之后,会执行 RMAppNewlySavingTransition()
。
private static final class RMAppNewlySavingTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client // communication LOG.info("Storing application with id " + app.applicationId); app.rmContext.getStateStore().storeNewApplication(app); } }
跟上来会发现它收回 RMStateStoreEventType.STORE_APP
事件,去 RMStateStore
中找一下对应的事件处理。发现也是个状态机:
.addTransition(RMStateStoreState.ACTIVE, EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_APP, new StoreAppTransition())
跟着 StoreAppTransition
看看做了啥(发送 RMAppEventType.APP_NEW_SAVED
事件)
private static class StoreAppTransition implements MultipleArcTransition<RMStateStore, RMStateStoreEvent, RMStateStoreState> { @Override public RMStateStoreState transition(RMStateStore store, RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); return RMStateStoreState.ACTIVE; } boolean isFenced = false; ApplicationStateData appState = ((RMStateStoreAppEvent) event).getAppState(); ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { store.storeApplicationStateInternal(appId, appState); // 这里发送了 RMAppEventType.APP_NEW_SAVED 事件 store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); }; }
二)RMAppImpl - APP_NEW_SAVED
咱们再回到 RMAppImpl
,找到对应的状态转移逻辑。
// 刚刚咱们的状态是 NEW_SAVING,收到了 APP_NEW_SAVED 事件,执行 AddApplicationToSchedulerTransition() 后,转换为 SUBMITTED 状态 .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
在 AddApplicationToSchedulerTransition()
中会发送 SchedulerEventType.APP_ADDED
事件。之后 RMAppImpl
转换为 RMAppState.SUBMITTED
状态。SchedulerEventType.APP_ADDED
会被多个事件处理器捕捉解决:
1)ResourceSchedulerWrapper
事件处理器,仅记录
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED && schedulerEvent instanceof AppAddedSchedulerEvent) { AppAddedSchedulerEvent appAddEvent = (AppAddedSchedulerEvent) schedulerEvent; String queueName = appAddEvent.getQueue(); appQueueMap.put(appAddEvent.getApplicationId(), queueName); }
2)各个 AbstractYarnScheduler
的实现类。以 CapacityScheduler
为例:
执行 addApplication()
case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; String queueName = resolveReservationQueueName(appAddedEvent.getQueue(), appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(), appAddedEvent.getIsAppRecovering()); if (queueName != null) { if (!appAddedEvent.getIsAppRecovering()) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); } else { addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); } } }
在 addApplication()
中会提交 Application 并发送 RMAppEventType.APP_ACCEPTED
事件。
queue.submitApplication(applicationId, user, queueName); rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
三)RMAppImpl - APP_ACCEPTED(重点)
持续回到 RMAppImpl
,执行 StartAppAttemptTransition()
,创立 newAttempt
,发送事件RMAppAttemptEventType.START
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
private static final class StartAppAttemptTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.createAndStartNewAttempt(false); }; }
private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createNewAttempt(); handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), transferStateFromPreviousAttempt)); }
在 RMAppAttemptImpl
中会捕捉这个事件,执行 AttemptStartedTransition()
,其中会发送 SchedulerEventType.APP_ATTEMPT_ADDED
事件,由 AbstractYarnScheduler 实现类解决
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition())
如在 CapacityScheduler 中由 addApplicationAttempt
解决,会提交 ApplicationAttempt
,并发送 RMAppAttemptEventType.ATTEMPT_ADDED
事件
private synchronized void addApplicationAttempt() { // 提交 attempt queue.submitApplicationAttempt(attempt, application.getUser()); // 发送 RMAppAttemptEventType.ATTEMPT_ADDED 事件 rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED));}
RMAppAttemptImpl
收到 event 后持续解决,在 ScheduleTransition
会 allocate am container 资源。
.addTransition(RMAppAttemptState.SUBMITTED, EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.ATTEMPT_ADDED, new ScheduleTransition())
// AM resource has been checked when submission Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, amBlacklist.getBlacklistAdditions(), amBlacklist.getBlacklistRemovals(), null, null);
ResourceScheduler 将资源返回给它之前,会向 RMContainerlmpl 发送一个 RMContainerEventType.ACQUIRED
事件。
在 RMContainerImpl
接到 RMContainerEventType.START
,发送 RMAppAttemptEventType.CONTAINER_ALLOCATED
事件。
.addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition())
private static final class ContainerStartedTransition extends BaseTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { container.eventHandler.handle(new RMAppAttemptEvent( container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); } }
又回到RMAppAttemptImpl
后续状态机,执行 AMContainerAllocatedTransition
,在其中又一次为 am allocate,和上一个状态中 allocate 仅参数不同,没搞懂为啥。这里如果发现 allocate container 资源还是 0,会退回上一步,状态还是 RMAppAttemptState.SCHEDULED
期待再次获取资源。如果失常获取到了资源,就会转为 RMAppAttemptState.ALLOCATED_SAVING
状态。
.addTransition(RMAppAttemptState.SCHEDULED, EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition())
Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, null, null, null);
日志记录实现后,RMStateStore
向 RMAppAttemptImpl
发送 RMAppAttemptEventType.ATTEMPT_NEW_SAVED
事件。RMAppAttemptImpl
后续向 ApplicationMasterLauncher
发 送 AMLauncherEventType.LAUNCH
事件(理论执行是在 AMLauncher
中),并将状态从 ALLOCATED_SAVING 转移为 ALLOCATED。
.addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
ApplicationMasterLauncher
收到 AMLauncherEventType.LAUNCH
事件后,会将该事件放到事件队列中,期待 AMLauncher
线程池中的线程解决该事件。它将与对应的 NodeManager 通信,启动 ApplicationMaster,一旦胜利启动后,将向 RMAppAttemptImpl
发送 RMAppAttemptEventType.LAUNCHED
事件。
public void run() { switch (eventType) { case LAUNCH: try { LOG.info("Launching master" + application.getAppAttemptId()); launch(); handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(), RMAppAttemptEventType.LAUNCHED));
RMAppAttemptImpl
收到 RMAppAttemptEventType.LAUNCHED
事件后,会向 AMLivelinessMonitor
注册,以监控运行状态。RMAppAttemptImpl
状态从 ALLOCATED
转移为 LAUNCHED
。
之后,NodeManager 通过心跳机制汇报 ApplicationMaster 所在 Container 曾经胜利启动,收到该信息后,ResourceScheduler 将发送一个 RMContainerEventType.LAUNCHED
事件,RMContainerImpl
收到该事件后,会从 ContainerAllocationExpirer 监控列表中移除。
启动的 ApplicationMaster 通过RPC 函数 ApplicationMasterProtocol#registerApplicationMaster
向 ResourceManager 注册,ResourceManager 中的 ApplicationMasterService
服务接管到该申请后,发送 RMAppAttemptEventType.REGISTERED
事件。
// ApplicationMasterService#registerApplicationMaster LOG.info("AM registration " + applicationAttemptId); this.rmContext .getDispatcher() .getEventHandler() .handle( // 这里发送 RMAppAttemptEventType.REGISTERED 事件 new RMAppAttemptRegistrationEvent(applicationAttemptId, request .getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAppAttemptImpl
收到该事件后,首先保留该 ApplicationMaster 的根本信息(比方所在 host、启用的 RPC 端口号等),而后向 RMApplmpl
发送一个 RMAppEventType.ATTEMPT_REGISTERED
事件。RMAppAttemptImpl
状态从 LAUNCHED
转移为 RUNNING
。
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
// AMRegisteredTransition appAttempt.eventHandler.handle(new RMAppEvent(appAttempt .getAppAttemptId().getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED));
四)RMAppImpl - ATTEMPT_REGISTERED
RMAppImpl
收到 RMAppEventType.ATTEMPT_REGISTERED
事件后,将状态从 ACCEPTED 转换为 RUNNING。
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( YarnApplicationState.RUNNING))
到这里,启动 ApplicationMaster 的整体流程剖析结束!
三、总结
本篇文章剖析了从应用程序提交到启动 ApplicationMaster 的整个过程,剖析具体过程看的可能会有些繁琐。但只有抓住外围实质,就很容易捋分明。重点就是事件处理和状态机,这两个部件了解分明,就很容易看明确程序的流转。
理论逻辑无非就是几个服务之间相互发送对应的事件,接管到事件后会执行启动服务、记录日志、监控状态,而后再发送个新的事件。
自身不难,但须要耐下心来一点点去梳理。