乐趣区

关于yarn:深入浅出-Yarn-架构与实现46-RM-行为探究-申请与分配-Container

本大节介绍应用程序的 ApplicationMaster 在 NodeManager 胜利启动并向 ResourceManager 注册后,向 ResourceManager 申请资源(Container)到获取到资源的整个过程,以及 ResourceManager 外部波及的次要工作流程。

一、整体流程

整个过程可看做以下两个阶段的送代循环:

  • 阶段 1 ApplicationMaster 汇报资源需要并支付曾经调配到的资源;
  • 阶段 2 NodeManager 向 ResourceManager 汇报各个 Container 运行状态,如果 ResourceManager 发现它下面有闲暇的资源,则进行一次资源分配,并将调配的资源保留到对应的 应用程序数据结构中,期待下次 ApplicationMaster 发送心跳信息时获取(即阶段 1)。

一)AM 汇报心跳

1、ApplicationMaster 通过 RPC 函数 ApplicationMasterProtocol#allocate 向 ResourceManager 汇报资源需要(因为该函数被周期性调用,咱们通常也称之为“心跳”),包含新的资源需要形容、待开释的 Container 列表、申请退出黑名单的节点列表、申请移除黑名单的节点列表等。

public AllocateResponse allocate(AllocateRequest request) {
    // Send the status update to the appAttempt.
    // 发送 RMAppAttemptEventType.STATUS_UPDATE 事件
    this.rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
    
    // 从 am 心跳 AllocateRequest 中取出新的资源需要形容、待开释的 Container 列表、黑名单列表
    List<ResourceRequest> ask = request.getAskList();
    List<ContainerId> release = request.getReleaseList();
    ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();

    // 接下来会做一些查看(资源申请量、label、blacklist 等)// 将资源申请宰割(动静调整 container 资源量)// Split Update Resource Requests into increase and decrease.
    // No Exceptions are thrown here. All update errors are aggregated
    // and returned to the AM.
    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
    List<UpdateContainerError> updateContainerErrors =
        RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
            request, maximumCapacity, increaseResourceReqs,
            decreaseResourceReqs);

    // 调用 ResourceScheduler#allocate 函数,将该 AM 资源需要汇报给 ResourceScheduler
    //(理论是 Capacity、Fair、Fifo 等理论指定的 Scheduler 解决)allocation =
        this.rScheduler.allocate(appAttemptId, ask, release,
            blacklistAdditions, blacklistRemovals,
            increaseResourceReqs, decreaseResourceReqs);
}

2、ResourceManager 中的 ApplicationMasterService#allocate 负责解决来自 AM 的心跳申请,收到该申请后,会发送一个 RMAppAttemptEventType.STATUS_UPDATE 事件,RMAppAttemptImpl 收到该事件后,将更新应用程序执行进度和 AMLivenessMonitor 中记录的应用程序最近更新工夫。
3、调用 ResourceScheduler#allocate 函数,将该 AM 资源需要汇报给 ResourceScheduler,理论是 Capacity、Fair、Fifo 等理论指定的 Scheduler 解决。
CapacityScheduler#allocate 实现为例:

// CapacityScheduler#allocate
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release,
    List<String> blacklistAdditions, List<String> blacklistRemovals,
    List<UpdateContainerRequest> increaseRequests,
    List<UpdateContainerRequest> decreaseRequests) {

    // Release containers
    // 发送 RMContainerEventType.RELEASED
    releaseContainers(release, application);

    // update increase requests
    LeafQueue updateDemandForQueue =
        updateIncreaseRequests(increaseRequests, application);

    // Decrease containers
    decreaseContainers(decreaseRequests, application);

    // Sanity check for new allocation requests
    // 会将资源申请进行规范化,限度到最小和最大区间内,并且标准到最小增长量上
    SchedulerUtils.normalizeRequests(ask, getResourceCalculator(), getClusterResource(),
        getMinimumResourceCapability(), getMaximumResourceCapability());

    // Update application requests
    // 将新的资源需要更新到对应的数据结构中
    if (application.updateResourceRequests(ask)
        && (updateDemandForQueue == null)) {updateDemandForQueue = (LeafQueue) application.getQueue();}

    // 获取曾经为该应用程序调配的资源
    allocation = application.getAllocation(getResourceCalculator(),
                   clusterResource, getMinimumResourceCapability());
        
    return allocation;
}

4、ResourceScheduler 首先读取待开释 Container 列表,向对应的 RMContainerImpl 发送 RMContainerEventType.RELEASED 类型事件,杀死正在运行的 Container;而后将新的资源需要更新到对应的数据结构中,之后获取曾经为该应用程序调配的资源,并返回给 ApplicationMasterService。

二)NM 汇报心跳

1、NodeManager 将以后节点各种信息(container 情况、节点利用率、衰弱状况等)封装到 nodeStatus 中,再将标识节点的信息一起封装到 request 中,之后通过 RPC 函数 ResourceTracker#nodeHeartbeat 向 ResourceManager 汇报这些状态。

// NodeStatusUpdaterImpl#startStatusUpdater
  protected void startStatusUpdater() {statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        // ...
        Set<NodeLabel> nodeLabelsForHeartbeat =
                nodeLabelsHandler.getNodeLabelsForHeartbeat();
        NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);

        NodeHeartbeatRequest request =
            NodeHeartbeatRequest.newInstance(nodeStatus,
                NodeStatusUpdaterImpl.this.context
                    .getContainerTokenSecretManager().getCurrentKey(),
                NodeStatusUpdaterImpl.this.context
                    .getNMTokenSecretManager().getCurrentKey(),
                nodeLabelsForHeartbeat);
          
        // 发送 nm 的心跳
        response = resourceTracker.nodeHeartbeat(request);

2、ResourceManager 中的 ResourceTrackerService 负责解决来自 NodeManager 的请 求,一旦收到该申请,会向 RMNodeImpl 发送一个 RMNodeEventType.STATUS_UPDATE 类型事件,而 RMNodelmpl 收到该事件后,将更新各个 Container 的运行状态,并进一步向 ResoutceScheduler 发送一个 SchedulerEventType.NODE_UPDATE 类型事件。

// ResourceTrackerService#nodeHeartbeat
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * Here is the node heartbeat sequence...
     * 1. Check if it's a valid (i.e. not excluded) node
     * 2. Check if it's a registered node
     * 3. Check if it's a'fresh' heartbeat i.e. not duplicate heartbeat
     * 4. Send healthStatus to RMNode
     * 5. Update node's labels if distributed Node Labels configuration is enabled
     */
      
    // 前 3 步都是各种查看,前面才是重点的逻辑
    // Heartbeat response
    NodeHeartbeatResponse nodeHeartBeatResponse =
        YarnServerBuilderUtils.newNodeHeartbeatResponse(getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
    // 这里会 set 待开释的 container、application 列表
    // 思考:为何只有待开释的列表呢?调配的资源不返回么?- 调配的资源是和 AM 进行交互的
    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);

    populateKeys(request, nodeHeartBeatResponse);

    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
        rmContext.getSystemCredentialsForApps();
    if (!systemCredentials.isEmpty()) {nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
    }

    // 4. Send status to RMNode, saving the latest response.
    // 发送 RMNodeEventType.STATUS_UPDATE 事件
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

3、ResourceScheduler 收到事件后,如果该节点上有可调配的闲暇资源,则会将这些资源分配给各个应用程序,而调配后的资源仅是记录到对应的数据结构中,期待 ApplicationMaster 下次通过心跳机制来支付。(资源分配的具体逻辑,将在前面介绍 Scheduler 的文章中具体解说)。

三、总结

本篇剖析了申请与调配 Container 的流程,次要分为两个阶段。
第一阶段由 AM 发动,通过心跳向 RM 发动资源申请。
第二阶段由 NM 发动,通过心跳向 RM 汇报资源应用状况。
之后就是,RM 依据 AM 资源申请以及 NM 残余资源进行一次资源分配(具体调配逻辑将在后续文章中介绍),并将调配的资源通过下一次 AM 心跳返回给 AM。

退出移动版