第1章 簡介
接上一篇文章,啟動TaskManager之后;本篇文章介紹TaskManager向ResourceManager注冊Slot,然后提供給JobManager,
第2章 具體步驟
2.1 啟動TaskExecutor
org.apache.flink.runtime.taskexecutor.TaskExecutor#startTaskExecutorServices
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
// TODO taskManager向ResourceManager發起連接
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
2.2 TM與RM建立連接
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService#start
我們看ZooKeeperLeaderRetrievalService的實作類
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService#start
@Override
public void start(LeaderRetrievalListener listener) throws Exception {
Preconditions.checkNotNull(listener, "Listener must not be null.");
Preconditions.checkState(leaderListener == null, "ZooKeeperLeaderRetrievalService can " +
"only be started once.");
LOG.info("Starting ZooKeeperLeaderRetrievalService {}.", retrievalPath);
synchronized (lock) {
leaderListener = listener;
// TODO 添加監聽器
client.getUnhandledErrorListenable().addListener(this);
cache.getListenable().addListener(this);
cache.start();
client.getConnectionStateListenable().addListener(connectionStateListener);
running = true;
}
}
最侄訓執行org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService#retrieveLeaderInformationFromZooKeeper
private void retrieveLeaderInformationFromZooKeeper() {
synchronized (lock) {
if (running) {
try {
// ...
// TODO 通知leader地址
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
ExceptionUtils.checkInterrupted(e);
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService#notifyIfNewLeaderAddress
@GuardedBy("lock")
private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) {
if (!(Objects.equals(newLeaderAddress, lastLeaderAddress) &&
Objects.equals(newLeaderSessionID, lastLeaderSessionID))) {
// ...
// TODO 通知Leader的地址
leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
}
}
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener#notifyLeaderAddress
實作類最侄訓是在TaskExecutor
org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
// TODO 獲得新的RM地址
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#notifyOfNewResourceManagerLeader
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
// TODO 連接RM
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#reconnectToResourceManager
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
startRegistrationTimeout();
// TODO 嘗試連接RM
tryConnectToResourceManager();
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#tryConnectToResourceManager
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
// TODO 連接RM
connectToResourceManager();
}
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#connectToResourceManager
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
JMXService.getPort().orElse(-1),
hardwareDescription,
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
// TODO 注意,注冊成功后會執行TaskExecutorToResourceManagerConnection中的回呼onRegistrationSuccess
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
// TODO 開始連接
resourceManagerConnection.start();
}
start啟動了RPC的注冊連接,連接成功執行TaskExecutorToResourceManagerConnection中的onRegistrationSuccess回呼
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection#onRegistrationSuccess
@Override
protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
log.info("Successful registration at resource manager {} under registration id {}.",
getTargetAddress(), success.getRegistrationId());
// TODO 注冊成功后
registrationListener.onRegistrationSuccess(this, success);
}
org.apache.flink.runtime.registration.RegistrationConnectionListener#onRegistrationSuccess的實作類ResourceManagerRegistrationListener實際上是TaskExecutor的一個內部類,
org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
try {
// TODO TM建立與RM的連接
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch (Throwable t) {
log.error("Establishing Resource Manager connection in Task Executor failed", t);
}
}
});
}
2.3 向RM注冊Slot
org.apache.flink.runtime.taskexecutor.TaskExecutor#establishResourceManagerConnection
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
// TODO 發送請求slot資訊
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
// ...
}
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway#sendSlotReport的實作方法:
org.apache.flink.runtime.resourcemanager.ResourceManager#sendSlotReport
@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
// TODO RM中的slotManager注冊TM
if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager#registerTaskManager的實作方法:
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerTaskManager
@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
// ...
// next register the new slots
for (SlotStatus slotStatus : initialSlotReport) {
// TODO 注冊Slot
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerSlot
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
JobID jobId,
ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {
// TODO 如果slots中已經存在,先根據slotId移除舊的slot
if (slots.containsKey(slotId)) {
// remove the old slot first
removeSlot(
slotId,
new SlotManagerException(
String.format(
"Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
slotId)));
}
// TODO 創建并注冊新的Slot
final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
final PendingTaskManagerSlot pendingTaskManagerSlot;
if (allocationId == null) {
// TODO 待定的slot
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}
if (pendingTaskManagerSlot == null) {
// TODO 更新slot
updateSlot(slotId, allocationId, jobId);
} else {
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
// TODO 分配掛起的請求為空
if (assignedPendingSlotRequest == null) {
// TODO 當作空閑的slot處理
handleFreeSlot(slot);
} else {
// TODO 取消掛起的TM slot
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
// TODO 分配slot
allocateSlot(slot, assignedPendingSlotRequest);
}
}
}
到這里TM向RM中注冊slot就完成了!
2.4 RM通知TM注冊資訊
RM注冊完Slot后,需要回傳注冊資訊給TM,
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#allocateSlot
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
// ...
// TODO 在所有當前注冊的TM中獲取當前實體
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
if (taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " +
instanceID + '.');
}
// TODO 標記為已使用
taskManagerRegistration.markUsed();
// RPC call to the task manager
// TODO 通知TM,提供slot給JM,供執行job
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
// ...
}
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway#requestSlot的實作方法
org.apache.flink.runtime.taskexecutor.TaskExecutor#requestSlot
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// ...
try {
// TODO 根據RM分配成功后的指令,分配自己的slot
allocateSlot(
slotId,
jobId,
allocationId,
resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
// ...
if (job.isConnected()) {
// TODO 提供slot給JobManager
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
TM在收到RM回傳的資訊后,先對自己內部的slot資訊進行回應的分配處理,然后再將slot資訊提供給JM,
2.5 TM提供Slot給JM
org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager
private void offerSlotsToJobManager(final JobID jobId) {
jobTable
.getConnection(jobId)
.ifPresent(this::internalOfferSlotsToJobManager);
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#internalOfferSlotsToJobManager
private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
final JobID jobId = jobManagerConnection.getJobId();
if (taskSlotTable.hasAllocatedSlots(jobId)) {
// ...
// TODO 連接jobMaster(jobManager),提供slot
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
taskManagerConfiguration.getTimeout());
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}
這里開始通過RPC請求JM,向JM提供Slot,
org.apache.flink.runtime.jobmaster.JobMasterGateway#offerSlots的實作方法
org.apache.flink.runtime.jobmaster.JobMaster#offerSlots
@Override
public CompletableFuture<Collection<SlotOffer>> offerSlots(
final ResourceID taskManagerId,
final Collection<SlotOffer> slots,
final Time timeout) {
// ...
// TODO jobManger中的slotpool提供slot
return CompletableFuture.completedFuture(
slotPool.offerSlots(
taskManagerLocation,
rpcTaskManagerGateway,
slots));
}
向JM中的slotpool提供slot
org.apache.flink.runtime.jobmaster.slotpool.SlotPool#offerSlots的實作方法:
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#offerSlots
@Override
public Collection<SlotOffer> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers) {
ArrayList<SlotOffer> result = new ArrayList<>(offers.size());
// TODO 提供slot
for (SlotOffer offer : offers) {
if (offerSlot(
taskManagerLocation,
taskManagerGateway,
offer)) {
result.add(offer);
}
}
return result;
}
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#offerSlot
boolean offerSlot(
final TaskManagerLocation taskManagerLocation,
final TaskManagerGateway taskManagerGateway,
final SlotOffer slotOffer) {
// ...
// TODO 分配slot
final AllocatedSlot allocatedSlot = new AllocatedSlot(
allocationID,
taskManagerLocation,
slotOffer.getSlotIndex(),
slotOffer.getResourceProfile(),
taskManagerGateway);
// use the slot to fulfill pending request, in requested order
// TODO 使用slot來完成掛起的請求
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
// we accepted the request in any case. slot will be released after it idled for
// too long and timed out
return true;
}
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#tryFulfillSlotRequestOrMakeAvailable
private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
// TODO 掛起的請求
final PendingRequest pendingRequest = findMatchingPendingRequest(allocatedSlot);
if (pendingRequest != null) {
log.debug("Fulfilling pending slot request [{}] with slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
// 移除掛起的請求
removePendingRequest(pendingRequest.getSlotRequestId());
// TODO 將掛起的請求添加到分配的slot中
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
// this allocation may become orphan once its corresponding request is removed
// TODO 獲取AllocationId,AllocationId在JM中生成,注冊給RM,然后由RM給TM,標記不同的分配
final Optional<AllocationID> allocationIdOfRequest = pendingRequest.getAllocationId();
// the allocation id can be null if the request was fulfilled by a slot directly offered
// by a reconnected TaskExecutor before the ResourceManager is connected
if (allocationIdOfRequest.isPresent()) {
maybeRemapOrphanedAllocation(allocationIdOfRequest.get(), allocatedSlot.getAllocationId());
}
} else {
log.debug("Adding slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
}
JM將TM提供的Slot進行校驗和記錄,
到這里,整個Slot注冊和提供的程序就結束了,Slot注冊完之后,下一步JM需要將job提交給TM執行,這部分內容再下一篇文章為您介紹!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/388417.html
標籤:其他
上一篇:關于剛畢業的程式員考取mba
下一篇:LeetCode500. 鍵盤行
