本節開始,將對 ResourceManager 中一些常見行為進行分析探究,看某些具體關鍵的行為,在 RM 中是如何流轉的,本節將深入原始碼探究「啟動 ApplicationMaster」的具體流程,
一、整體流程
本小節介紹從應用程式提交到啟動 ApplicationMaster 的整個程序,期間涉及 Client、RMService、 RMAppManager、RMApplmpl、RMAppAttemptImpl、RMNode、ResourceScheduler 等幾個主要組件,當客戶端呼叫 RPC 函式 ApplicationClientProtocol#submitApplication 后, ResourceManager 端的處理程序如下圖所示,

二、具體流程分析
接下來跟隨上面的流程圖,我們深入原始碼具體分析每一步都是如何執行的:
最開始由客戶端發起任務提交 submitApplication(),經過 ClientRMService 和 RMAppManager 發送 RMAppEventType.START 事件,之后交由 RMAppImpl 處理,
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
Credentials credentials = null;
try {
credentials = parseCredentials(submissionContext);
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer()
.addApplicationAsync(applicationId, credentials,
submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
// 這里發送 RMAppEventType.START 事件
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
RMAppImpl 這東西是個狀態機,收到事件之后會自己轉換狀態并且處理相應的邏輯,
(狀態機還不熟悉的同學,可翻到我前面的文章進行學習《2-4 Yarn 基礎庫 - 狀態機庫》)

截取一部分狀態轉換代碼:
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
// 收到 RMAppEventType.START 事件
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
一)RMAppImpl - START
收到 RMAppEventType.START 事件之后,會執行 RMAppNewlySavingTransition(),
private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
}
跟下去會發現它發出 RMStateStoreEventType.STORE_APP 事件,去 RMStateStore 中找一下對應的事件處理,發現也是個狀態機:
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
跟著 StoreAppTransition 看看做了啥(發送 RMAppEventType.APP_NEW_SAVED 事件)
private static class StoreAppTransition
implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationStateData appState =
((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appState);
// 這里發送了 RMAppEventType.APP_NEW_SAVED 事件
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
};
}
二)RMAppImpl - APP_NEW_SAVED
我們再回到 RMAppImpl,找到對應的狀態轉移邏輯,
// 剛剛我們的狀態是 NEW_SAVING,收到了 APP_NEW_SAVED 事件,執行 AddApplicationToSchedulerTransition() 后,轉換為 SUBMITTED 狀態
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
在 AddApplicationToSchedulerTransition() 中會發送 SchedulerEventType.APP_ADDED 事件,之后 RMAppImpl 轉換為 RMAppState.SUBMITTED 狀態,
SchedulerEventType.APP_ADDED 會被多個事件處理器捕獲處理:
1)ResourceSchedulerWrapper 事件處理器,僅記錄
} else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
&& schedulerEvent instanceof AppAddedSchedulerEvent) {
AppAddedSchedulerEvent appAddEvent =
(AppAddedSchedulerEvent) schedulerEvent;
String queueName = appAddEvent.getQueue();
appQueueMap.put(appAddEvent.getApplicationId(), queueName);
}
2)各個 AbstractYarnScheduler 的實作類,以 CapacityScheduler 為例:
執行 addApplication()
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
appAddedEvent.getIsAppRecovering());
if (queueName != null) {
if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
} else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
}
}
}
在 addApplication() 中會提交 Application 并發送 RMAppEventType.APP_ACCEPTED 事件,
queue.submitApplication(applicationId, user, queueName);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
三)RMAppImpl - APP_ACCEPTED(重點)
繼續回到 RMAppImpl,執行 StartAppAttemptTransition(),創建 newAttempt,發送事件RMAppAttemptEventType.START
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.createAndStartNewAttempt(false);
};
}
private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
transferStateFromPreviousAttempt));
}
在 RMAppAttemptImpl 中會捕獲這個事件,執行 AttemptStartedTransition(),其中會發送 SchedulerEventType.APP_ATTEMPT_ADDED 事件,由 AbstractYarnScheduler 實作類處理
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
RMAppAttemptEventType.START, new AttemptStartedTransition())
如在 CapacityScheduler 中由 addApplicationAttempt 處理,會提交 ApplicationAttempt,并發送 RMAppAttemptEventType.ATTEMPT_ADDED 事件
private synchronized void addApplicationAttempt() {
// 提交 attempt
queue.submitApplicationAttempt(attempt, application.getUser());
// 發送 RMAppAttemptEventType.ATTEMPT_ADDED 事件
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
RMAppAttemptImpl 收到 event 后繼續處理,在 ScheduleTransition 會 allocate am container 資源,
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.ATTEMPT_ADDED,
new ScheduleTransition())
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST,
amBlacklist.getBlacklistAdditions(),
amBlacklist.getBlacklistRemovals(), null, null);
ResourceScheduler 將資源回傳給它之前,會向 RMContainerlmpl 發送一個 RMContainerEventType.ACQUIRED 事件,
在 RMContainerImpl 接到 RMContainerEventType.START,發送 RMAppAttemptEventType.CONTAINER_ALLOCATED 事件,
.addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
RMContainerEventType.START, new ContainerStartedTransition())
private static final class ContainerStartedTransition extends
BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
}
}
又回到RMAppAttemptImpl 后續狀態機,執行 AMContainerAllocatedTransition,在其中又一次為 am allocate,和上一個狀態中 allocate 僅引數不同,沒搞懂為啥,這里如果發現 allocate container 資源還是 0,會退回上一步,狀態還是 RMAppAttemptState.SCHEDULED 等待再次獲取資源,如果正常獲取到了資源,就會轉為 RMAppAttemptState.ALLOCATED_SAVING 狀態,
.addTransition(RMAppAttemptState.SCHEDULED,
EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null, null, null);
日志記錄完成后,RMStateStore 向 RMAppAttemptImpl 發送 RMAppAttemptEventType.ATTEMPT_NEW_SAVED 事件,
RMAppAttemptImpl 后續向 ApplicationMasterLauncher 發 送 AMLauncherEventType.LAUNCH 事件(實際執行是在 AMLauncher 中),并將狀態從 ALLOCATED_SAVING 轉移為 ALLOCATED,
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
ApplicationMasterLauncher 收到 AMLauncherEventType.LAUNCH 事件后,會將該事件放到事件佇列中,等待 AMLauncher 執行緒池中的執行緒處理該事件,它將與對應的 NodeManager 通信,啟動 ApplicationMaster,一旦成功啟動后,將向 RMAppAttemptImpl 發送 RMAppAttemptEventType.LAUNCHED 事件,
public void run() {
switch (eventType) {
case LAUNCH:
try {
LOG.info("Launching master" + application.getAppAttemptId());
launch();
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
RMAppAttemptImpl 收到 RMAppAttemptEventType.LAUNCHED 事件后,會向 AMLivelinessMonitor 注冊,以監控運行狀態,RMAppAttemptImpl 狀態從 ALLOCATED 轉移為 LAUNCHED,
之后,NodeManager 通過心跳機制匯報 ApplicationMaster 所在 Container 已經成功啟動,收到該資訊后,ResourceScheduler 將發送一個 RMContainerEventType.LAUNCHED 事件,RMContainerImpl 收到該事件后,會從 ContainerAllocationExpirer 監控串列中移除,
啟動的 ApplicationMaster 通過RPC 函式 ApplicationMasterProtocol#registerApplicationMaster 向 ResourceManager 注冊,ResourceManager 中的 ApplicationMasterService 服務接收到該請求后,發送 RMAppAttemptEventType.REGISTERED 事件,
// ApplicationMasterService#registerApplicationMaster
LOG.info("AM registration " + applicationAttemptId);
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
// 這里發送 RMAppAttemptEventType.REGISTERED 事件
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAppAttemptImpl 收到該事件后,首先保存該 ApplicationMaster 的基本資訊(比如所在 host、啟用的 RPC 埠號等),然后向 RMApplmpl 發送一個 RMAppEventType.ATTEMPT_REGISTERED 事件,RMAppAttemptImpl 狀態從 LAUNCHED 轉移為 RUNNING,
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
// AMRegisteredTransition
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
.getAppAttemptId().getApplicationId(),
RMAppEventType.ATTEMPT_REGISTERED));
四)RMAppImpl - ATTEMPT_REGISTERED
RMAppImpl 收到 RMAppEventType.ATTEMPT_REGISTERED 事件后,將狀態從 ACCEPTED 轉換為 RUNNING,
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
到這里,啟動 ApplicationMaster 的整體流程分析完畢!
三、總結
本篇文章分析了從應用程式提交到啟動 ApplicationMaster 的整個程序,分析具體程序看的可能會有些繁瑣,但只要抓住核心本質,就很容易捋清楚,重點就是事件處理和狀態機,這兩個部件理解清楚,就很容易看明白程式的流轉,
實際邏輯無非就是幾個服務之間互相發送對應的事件,接收到事件后會執行啟動服務、記錄日志、監控狀態,然后再發送個新的事件,
本身不難,但需要耐下心來一點點去梳理,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/545294.html
標籤:其他
上一篇:【SpringCloud】feign.codec.EncodeException: No qualifying bean of type
