本小節介紹應用程式的 ApplicationMaster 在 NodeManager 成功啟動并向 ResourceManager 注冊后,向 ResourceManager 請求資源(Container)到獲取到資源的整個程序,以及 ResourceManager 內部涉及的主要作業流程,
一、整體流程
整個程序可看做以下兩個階段的送代回圈:
- 階段1 ApplicationMaster 匯報資源需求并領取已經分配到的資源;
- 階段2 NodeManager 向 ResourceManager 匯報各個 Container 運行狀態,如果 ResourceManager 發現它上面有空閑的資源,則進行一次資源分配,并將分配的資源保存到對應的 應用程式資料結構中,等待下次 ApplicationMaster 發送心跳資訊時獲取(即階段1),
一)AM 匯報心跳
1、ApplicationMaster 通過 RPC 函式 ApplicationMasterProtocol#allocate 向 ResourceManager 匯報資源需求(由于該函式被周期性呼叫,我們通常也稱之為“心跳”),包括新的資源需求描述、待釋放的 Container 串列、請求加入黑名單的節點串列、請求移除黑名單的節點串列等,
public AllocateResponse allocate(AllocateRequest request) {
// Send the status update to the appAttempt.
// 發送 RMAppAttemptEventType.STATUS_UPDATE 事件
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
// 從 am 心跳 AllocateRequest 中取出新的資源需求描述、待釋放的 Container 串列、黑名單串列
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
// 接下來會做一些檢查(資源申請量、label、blacklist 等)
// 將資源申請分割(動態調整 container 資源量)
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
List<UpdateContainerError> updateContainerErrors =
RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
request, maximumCapacity, increaseResourceReqs,
decreaseResourceReqs);
// 呼叫 ResourceScheduler#allocate 函式,將該 AM 資源需求匯報給 ResourceScheduler
// (實際是 Capacity、Fair、Fifo 等實際指定的 Scheduler 處理)
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
increaseResourceReqs, decreaseResourceReqs);
}
2、ResourceManager 中的 ApplicationMasterService#allocate 負責處理來自 AM 的心跳請求,收到該請求后,會發送一個 RMAppAttemptEventType.STATUS_UPDATE 事件,RMAppAttemptImpl 收到該事件后,將更新應用程式執行進度和 AMLivenessMonitor 中記錄的應用程式最近更新時間,
3、呼叫 ResourceScheduler#allocate 函式,將該 AM 資源需求匯報給 ResourceScheduler,實際是 Capacity、Fair、Fifo 等實際指定的 Scheduler 處理,
以 CapacityScheduler#allocate 實作為例:
// CapacityScheduler#allocate
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
// Release containers
// 發送 RMContainerEventType.RELEASED
releaseContainers(release, application);
// update increase requests
LeafQueue updateDemandForQueue =
updateIncreaseRequests(increaseRequests, application);
// Decrease containers
decreaseContainers(decreaseRequests, application);
// Sanity check for new allocation requests
// 會將資源請求進行規范化,限制到最小和最大區間內,并且規范到最小增長量上
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), getMaximumResourceCapability());
// Update application requests
// 將新的資源需求更新到對應的資料結構中
if (application.updateResourceRequests(ask)
&& (updateDemandForQueue == null)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
// 獲取已經為該應用程式分配的資源
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
return allocation;
}
4、ResourceScheduler 首先讀取待釋放 Container 串列,向對應的 RMContainerImpl 發送 RMContainerEventType.RELEASED 型別事件,殺死正在運行的 Container;然后將新的資源需求更新到對應的資料結構中,之后獲取已經為該應用程式分配的資源,并回傳給 ApplicationMasterService,
二)NM 匯報心跳
1、NodeManager 將當前節點各種資訊(container 狀況、節點利用率、健康情況等)封裝到 nodeStatus 中,再將標識節點的資訊一起封裝到 request 中,之后通過RPC 函式 ResourceTracker#nodeHeartbeat 向 ResourceManager 匯報這些狀態,
// NodeStatusUpdaterImpl#startStatusUpdater
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
// ...
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat);
// 發送 nm 的心跳
response = resourceTracker.nodeHeartbeat(request);
2、ResourceManager 中的 ResourceTrackerService 負責處理來自 NodeManager 的請 求,一旦收到該請求,會向 RMNodeImpl 發送一個 RMNodeEventType.STATUS_UPDATE 型別事件,而 RMNodelmpl 收到該事件后,將更新各個 Container 的運行狀態,并進一步向 ResoutceScheduler 發送一個 SchedulerEventType.NODE_UPDATE 型別事件,
// ResourceTrackerService#nodeHeartbeat
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
NodeStatus remoteNodeStatus = request.getNodeStatus();
/**
* Here is the node heartbeat sequence...
* 1. Check if it's a valid (i.e. not excluded) node
* 2. Check if it's a registered node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode
* 5. Update node's labels if distributed Node Labels configuration is enabled
*/
// 前 3 步都是各種檢查,后面才是重點的邏輯
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
// 這里會 set 待釋放的 container、application 串列
// 思考:為何只有待釋放的串列呢?分配的資源不回傳么? - 分配的資源是和 AM 進行互動的
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
rmContext.getSystemCredentialsForApps();
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
// 4. Send status to RMNode, saving the latest response.
// 發送 RMNodeEventType.STATUS_UPDATE 事件
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
3、ResourceScheduler 收到事件后,如果該節點上有可分配的空閑資源,則會將這些資源分配給各個應用程式,而分配后的資源僅是記錄到對應的資料結構中,等待 ApplicationMaster 下次通過心跳機制來領取,(資源分配的具體邏輯,將在后面介紹 Scheduler 的文章中詳細講解),
三、總結
本篇分析了申請與分配 Container 的流程,主要分為兩個階段,
第一階段由 AM 發起,通過心跳向 RM 發起資源請求,
第二階段由 NM 發起,通過心跳向 RM 匯報資源使用情況,
之后就是,RM 根據 AM 資源請求以及 NM 剩余資源進行一次資源分配(具體分配邏輯將在后續文章中介紹),并將分配的資源通過下一次 AM 心跳回傳給 AM,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/545417.html
標籤:其他
上一篇:尚硅谷_每日一考_427
下一篇:編程語言的基本資料型別介紹

