尊重原創著作權: https://www.gewuweb.com/hot/16037.html
揭秘Flink四種執行圖(下)——ExecutionGraph和物理執行圖
尊重原創著作權: https://www.gewuweb.com/sitemap.html
在上一篇文章 揭秘Flink四種執行圖(上)——StreamGraph和JobGraph
中,我們已經通過原始碼詳細分析過StremGraph和JobGraph是如何生成的,本文將繼續深度解讀ExecutionGraph和物理執行圖的生成,
** 1、 ExecutionGraph在Jobanager生成 **
client生成JobGraph之后,就通過submitJob提交給JobManager,JobManager 會根據
JobGraph生成對應的ExecutionGraph,
ExecutionGraph 是Flink 作業調度時使?到的核?資料結構,它包含每?個并?的 task、每?個 intermediate stream
以及它們之間的關系,
以per-job模式為例,分析 ExecutionGraph的生成邏輯:
在Dispacher 創建JobManagerRunner時,呼叫createJobManagerRunner:
=> createJobManagerRunner()
=>new JobManagerRunnerImpl()
=>createJobMasterService()
=> new JobMaster()
在創建JobMaster的時候,創建了Scheduler調度器
=> createScheduler()
=> createInstance()
=> new DefaultScheduler() #調度器
=> createAndRestoreExecutionGraph()
=> createExecutionGraph()
=> ExecutionGraphBuilder.buildGraph()
下面是原始碼詳細地跳轉程序,可以直接略過,看后面buildGraph()方法的分析:
Dispatcher.java
CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
final RpcService rpcService =getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 啟動 JobManagerRunner
runner.start();
return runner;
}
......
}
DefaultJobManagerRunnerFactory.java
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)throws Exception {
... ...
return new JobManagerRunnerImpl(
jobGraph,
jobMasterFactory,
highAvailabilityServices,
... ...
}
JobManagerRunnerImpl.java
public JobManagerRunnerImpl(
final JobGraph jobGraph,
final JobMasterServiceFactoryjobMasterFactory,
final HighAvailabilityServiceshaServices,
final LibraryCacheManager.ClassLoaderLease classLoaderLease,
final Executor executor,
final FatalErrorHandlerfatalErrorHandler,
long initializationTimestamp)throws Exception {
this.jobMasterService =jobMasterFactory.createJobMasterService(jobGraph, this,userCodeLoader, initializationTimestamp);
}
DefaultJobManagerRunnerFactory.java
public JobMaster createJobMasterService(
JobGraph jobGraph,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader,
long initializationTimestamp)throws Exception {
return new JobMaster(
rpcService,
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
jobCompletionActions,
fatalErrorHandler,
userCodeClassloader,
schedulerNGFactory,
shuffleMaster,
lookup -> new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(),
shuffleMaster,
lookup
),
new DefaultExecutionDeploymentTracker(),
DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);
}
JobMaster.java
public JobMaster(
RpcService rpcService,
JobMasterConfigurationjobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolFactory slotPoolFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader,
SchedulerNGFactory schedulerNGFactory,
ShuffleMaster<?> shuffleMaster,
PartitionTrackerFactory partitionTrackerFactory,
ExecutionDeploymentTracker executionDeploymentTracker,
ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
long initializationTimestamp)throws Exception {
... ...
this.schedulerNG = createScheduler(executionDeploymentTracker,jobManagerJobMetricGroup);
... ...
}
private SchedulerNG createScheduler(ExecutionDeploymentTracker executionDeploymentTracker,
final JobManagerJobMetricGroup jobManager JobMetricGroup) throws Exception {
return schedulerNGFactory.createInstance(
log,
jobGraph,
backPressureStatsTracker,
scheduledExecutorService,
jobMasterConfiguration.getConfiguration(),
slotPool,
scheduledExecutorService,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp);
}
DefaultSchedulerFactory.java
public SchedulerNG createInstance(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTrackerbackPressureStatsTracker,
final Executor ioExecutor,
final ConfigurationjobMasterConfiguration,
final SlotPool slotPool,
final ScheduledExecutorServicefutureExecutor,
final ClassLoaderuserCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroupjobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?>shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
final ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp)throws Exception {
... ...
return new DefaultScheduler(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
schedulerComponents.getStartUpAction(),
futureExecutor,
newScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
shuffleMaster,
partitionTracker,
schedulerComponents.getSchedulingStrategyFactory(),
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
newDefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
schedulerComponents.getAllocatorFactory(),
executionDeploymentTracker,
initializationTimestamp);
}
SchedulerBase.java
public SchedulerBase(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTrackerbackPressureStatsTracker,
final Executor ioExecutor,
final ConfigurationjobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorServicefutureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final RestartStrategyFactoryrestartStrategyFactory,
final BlobWriter blobWriter,
final JobManagerJobMetricGroupjobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?>shuffleMaster,
final JobMasterPartitionTrackerpartitionTracker,
final ExecutionVertexVersionerexecutionVertexVersioner,
final ExecutionDeploymentTrackerexecutionDeploymentTracker,
final boolean legacyScheduling,
long initializationTimestamp) throwsException {
... ...
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup,checkNotNull(shuffleMaster), checkNotNull(partitionTracker),checkNotNull(executionDeploymentTracker), initializationTimestamp);
... ...
}
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup,shuffleMaster, partitionTracker, executionDeploymentTracker,initializationTimestamp);
... ...
}
private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws JobExecutionException, JobException {
... ...
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
jobMasterConfiguration,
futureExecutor,
ioExecutor,
slotProvider,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
restartStrategy,
currentJobManagerJobMetricGroup,
blobWriter,
slotRequestTimeout,
log,
shuffleMaster,
partitionTracker,
failoverStrategy,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp);
}
接下來,分析生成ExecutionGraph的核心邏輯:
ExecutionGraphBuilder.java
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
SlotProvider slotProvider,
ClassLoader classLoader,
CheckpointRecoveryFactoryrecoveryFactory,
Time rpcTimeout,
RestartStrategy restartStrategy,
MetricGroup metrics,
BlobWriter blobWriter,
Time allocationTimeout,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
FailoverStrategy.Factory failoverStrategyFactory,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp) throws JobExecutionException, JobException {
checkNotNull(jobGraph, "job graphcannot be null");
final String jobName =jobGraph.getName();
final JobID jobId =jobGraph.getJobID();
final JobInformation jobInformation =new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
final int maxPriorAttemptsHistoryLength=
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
final PartitionReleaseStrategy.FactorypartitionReleaseStrategyFactory =
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
// create a new execution graph, ifnone exists so far
// 如果不存在執?圖,就創建?個新的執?圖
final ExecutionGraph executionGraph;
try {
executionGraph = (prior !=null) ? prior :
new ExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
restartStrategy,
maxPriorAttemptsHistoryLength,
failoverStrategyFactory,
slotProvider,
classLoader,
blobWriter,
allocationTimeout,
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
jobGraph.getScheduleMode(),
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp);
} catch (IOException e) {
throw newJobException("Could not create the ExecutionGraph.", e);
}
// set the basic properties
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
catch (Throwable t) {
log.warn("Cannot createJSON plan for job", t);
// give the graph an emptyplan
executionGraph.setJsonPlan("{}");
}
// initialize the vertices that have amaster initialization hook
// file output formats createdirectories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initializationon master for job {} ({}).", jobName, jobId);
for (JobVertex vertex :jobGraph.getVertices()) {
// 獲取作業圖中的每個節點的執?類,檢查?下有沒有沒有執?類的節點,防御式編程
String executableClass = vertex.getInvokableClassName();
if (executableClass == null ||executableClass.isEmpty()) {
throw newJobSubmissionException(jobId,
"Thevertex " + vertex.getID() + " (" + vertex.getName() + ")has no invokable class.");
}
try {
// 設定好每個節點的類加載器
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw newJobExecutionException(jobId,
"Cannotinitialize task '" + vertex.getName() + "': " + t.getMessage(),t);
}
}
log.info("Successfully raninitialization on master in {} ms.",
(System.nanoTime() -initMasterStart) / 1_000_000);
// topologically sort the job verticesand attach the graph to the existing one
// 對JobGraph進?拓撲排序,獲取所有的JobVertex串列
List<JobVertex> sortedTopology =jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {}vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
// 核心邏輯:將拓撲排序過的JobGraph添加到 executionGraph資料結構中,
executionGraph.attachJobGraph(sortedTopology);
... ...
}
public void attachJobGraph(List<JobVertex>topologiallySorted) throws JobException {
... ...
// ExecutionJobVertex是執?圖的節點
final ArrayList<ExecutionJobVertex>newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp =System.currentTimeMillis();
// 遍歷JobVertex
for (JobVertex jobVertex :topologiallySorted) {
if (jobVertex.isInputVertex()&& !jobVertex.isStoppable()) {
this.isStoppable =false;
}
// create the execution jobvertex and attach it to the graph
// 實體化執?圖節點,根據每?個job vertex,創建對應的 ExecutionVertex
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
maxPriorAttemptsHistoryLength,
rpcTimeout,
globalModVersion,
createTimestamp);
// 將創建的ExecutionJobVertex與前置的IntermediateResult連接起來
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertexpreviousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw newJobException(String.format("Encountered two job vertices with ID %s :previous=[%s] / new=[%s]",
jobVertex.getID(),ejv, previousTask));
}
for (IntermediateResult res :ejv.getProducedDataSets()) {
IntermediateResultpreviousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet !=null) {
throw newJobException(String.format("Encountered two intermediate data set with ID%s : previous=[%s] / new=[%s]",
res.getId(),res, previousDataSet));
}
}
//節點總數量需要加上當前執?圖節點的并?度,因為執?圖是作業圖的并?化版本
// 并?化就體現在并?度上,?個并?度對應?個節點,
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal +=ejv.getParallelism();
// 將當前執?圖節點加?到圖中
newExecJobVertices.add(ejv);
}
// the topology assigning should happenbefore notifying new vertices to failoverStrategy
executionTopology =DefaultExecutionTopology.fromExecutionGraph(this);
failoverStrategy.notifyNewVertices(newExecJobVertices);
partitionReleaseStrategy =partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
public void connectToPredecessors(Map<IntermediateDataSetID,IntermediateResult> intermediateDataSets) throws JobException {
// 獲取輸入的JobEdge串列
List<JobEdge> inputs =jobVertex.getInputs();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("ConnectingExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(),jobVertex.getName(), inputs.size()));
}
// 遍歷每條JobEdge
for (int num = 0; num <inputs.size(); num++) {
JobEdge edge =inputs.get(num);
if (LOG.isDebugEnabled()) {
if (edge.getSource()== null) {
LOG.debug(String.format("Connectinginput %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num,jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connectinginput %d of vertex %s (%s) to intermediate result referenced via predecessor %s(%s).",
num,jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(),edge.getSource().getProducer().getName()));
}
}
// fetch the intermediateresult via ID. if it does not exist, then it either has not been created, orthe order
// in which this method iscalled for the job vertices is not a topological order
// 獲取當前JobEdge的輸入所對應的IntermediateResult
// 通過ID獲取中間結果,如果中間結果不存在,那么或者中間結果沒有被創建,
// 或者JobVertex沒有進?拓撲排序,
IntermediateResult ires =intermediateDataSets.get(edge.getSourceId());
if (ires == null) {
throw newJobException("Cannot connect this job graph to the previous graph. Noprevious intermediate result found for ID "
+edge.getSourceId());
}
// 將IntermediateResult加入到當前ExecutionJobVertex的輸入中,
this.inputs.add(ires);
// 為中間結果注冊消費者,這樣中間結果的消費又多了?個(就是當前節點)
// 為IntermediateResult注冊consumer
// consumerIndex跟IntermediateResult的出度相關
int consumerIndex =ires.registerConsumer();
for (int i = 0; i <parallelism; i++) {
//由于每?個并?度都對應?個節點,所以要把每個節點都和前面中間結果相連,
ExecutionVertex ev =taskVertices[i];
// 將ExecutionVertex與IntermediateResult關聯起來
ev.connectSource(num, ires, edge,consumerIndex);
}
}
}
public void connectSource(int inputNumber,IntermediateResult source, JobEdge edge, int consumerNumber) {
// 只有forward的方式的情況下,pattern才是POINTWISE的,否則均為ALL_TO_ALL
final DistributionPattern pattern =edge.getDistributionPattern();
final IntermediateResultPartition[]sourcePartitions = source.getPartitions();
ExecutionEdge[] edges;
switch (pattern) {
case POINTWISE:
edges =connectPointwise(sourcePartitions, inputNumber);
break;
case ALL_TO_ALL:
edges = connectAllToAll(sourcePartitions,inputNumber);
break;
default:
throw new RuntimeException("Unrecognizeddistribution pattern.");
}
inputEdges[inputNumber] = edges;
// add the consumers to the source
// for now (until the receiverinitiated handshake is in place), we need to register the
// edges as the execution graph
//之前已經為IntermediateResult添加了consumer,
// 這里為IntermediateResultPartition添加consumer,即關聯到ExecutionEdge上
for (ExecutionEdge ee : edges) {
ee.getSource().addConsumer(ee,consumerNumber);
}
}
private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[]sourcePartitions, int inputNumber) {
ExecutionEdge[] edges = newExecutionEdge[sourcePartitions.length];
for (int i = 0; i <sourcePartitions.length; i++) {
IntermediateResultPartitionirp = sourcePartitions[i];
edges[i] = newExecutionEdge(irp, this, inputNumber);
}
return edges;
}
看這個方法之前,需要知道,ExecutionVertex的inputEdges變數,是一個二維資料,它表示了這個ExecutionVertex上每一個input所包含的ExecutionEdge串列,
即,如果ExecutionVertex有兩個不同的輸入:輸入A和B,其中輸入A的partition=1,
輸入B的partition=8,那么這個二維陣列inputEdges如下(以irp代替IntermediateResultPartition)
[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]
到這里為止,ExecutionJobGraph就創建完成了,
** 2、 使用Akka **
接著進行調度的原始碼分析:
JobMaster.java
private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {
... ...
// 啟動JobMaster
startJobMasterServices();
log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId);
// 重置開始調度
resetAndStartScheduler();
... ...
}
private void resetAndStartScheduler() throwsException {
... ...
FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
}
private void startScheduling() {
checkState(jobStatusListener == null);
// register self as job status changelistener
jobStatusListener = new JobManagerJobStatusListener();
schedulerNG.registerJobStatusListener(jobStatusListener);
schedulerNG.startScheduling();
}
DefaultScheduler.java
protected void startSchedulingInternal() {
log.info("Starting scheduling withscheduling strategy [{}]", schedulingStrategy.getClass().getName());
prepareExecutionGraphForNgScheduling();
schedulingStrategy.startScheduling();
}
PipelinedRegionSchedulingStrategy.java
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region ->!region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
private void maybeScheduleRegions(finalSet<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology,regions);
for (SchedulingPipelinedRegion region :regionsSorted) {
maybeScheduleRegion(region);
}
}
private void maybeScheduleRegion(finalSchedulingPipelinedRegion region) {
if(!areRegionInputsAllConsumable(region)) {
return;
}
checkState(areRegionVerticesAllInCreatedState(region),"BUG: trying to schedule a region which is not in CREATED state");
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region),
id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
DefaultScheduler.java
public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
validateDeploymentOptions(executionVertexDeploymentOptions);
final Map<ExecutionVertexID,ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
final List<ExecutionVertexID> verticesToDeploy= executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList());
final Map<ExecutionVertexID,ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy);
transitionToScheduled(verticesToDeploy);
finalList<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
allocateSlots(executionVertexDeploymentOptions);
final List<DeploymentHandle> deploymentHandles= createDeploymentHandles(
requiredVersionByVertex,
deploymentOptionsByVertex,
slotExecutionVertexAssignments);
waitForAllSlotsAndDeploy(deploymentHandles);
}
private void waitForAllSlotsAndDeploy(final List<DeploymentHandle>deploymentHandles) {
FutureUtils.assertNoException(
assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
}
private BiFunction<Void, Throwable, Void> deployAll(final List<DeploymentHandle> deploymentHandles){
return (ignored, throwable) -> {
propagateIfNonNull(throwable);
for (final DeploymentHandledeploymentHandle : deploymentHandles) {
finalSlotExecutionVertexAssignment slotExecutionVertexAssignment =deploymentHandle.getSlotExecutionVertexAssignment();
finalCompletableFuture<LogicalSlot> slotAssigned =slotExecutionVertexAssignment.getLogicalSlotFuture();
checkState(slotAssigned.isDone());
FutureUtils.assertNoException(
slotAssigned.handle(deployOrHandleError(deploymentHandle)));
}
return null;
};
}
private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandledeploymentHandle) {
final ExecutionVertexVersionrequiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
final ExecutionVertexIDexecutionVertexId = requiredVertexVersion.getExecutionVertexId();
return (ignored, throwable) -> {
if(executionVertexVersioner.isModified(requiredVertexVersion)) {
log.debug("Refusingto deploy execution vertex {} because this deployment was " +
"supersededby another deployment", executionVertexId);
return null;
}
if (throwable == null) {
deployTaskSafe(executionVertexId);
} else {
handleTaskDeploymentFailure(executionVertexId,throwable);
}
return null;
};
}
private void deployTaskSafe(final ExecutionVertexIDexecutionVertexId) {
try {
// 通過執行圖的節點ID獲取執行圖的節點
final ExecutionVertexexecutionVertex = getExecutionVertex(executionVertexId);
// deploy方法用來部署執行圖節點
executionVertexOperations.deploy(executionVertex);
} catch (Throwable e) {
handleTaskDeploymentFailure(executionVertexId,e);
}
}
DefaultExecutionVertexOperations.java
public void deploy(final ExecutionVertexexecutionVertex) throws JobException {
executionVertex.deploy();
}
ExecutionVertex.java
public void deploy() throws JobException {
currentExecution.deploy();
}
Execution.java
public void deploy() throws JobException {
... ...
// 包含了從Execution Graph到真正物理執行圖的轉換,
// 比如將IntermediateResultPartition轉化成ResultPartition,
// ExecutionEdge轉成InputChannelDeploymentDescriptor(最侄訓在執行時轉化成InputGate),
final TaskDeploymentDescriptordeployment = TaskDeploymentDescriptorFactory
.fromExecutionVertex(vertex, attemptNumber)
.createDeploymentDescriptor(
slot.getAllocationId(),
slot.getPhysicalSlotNumber(),
taskRestore,
producedPartitions.values());
... ...
// We run the submission inthe future executor so that the serialization of large TDDs does not block
// the main thread and syncback to the main thread once submission is completed.
CompletableFuture.supplyAsync(()-> taskManagerGateway.submitTask(deployment,rpcTimeout), executor)
... ...
}
RpcTaskManagerGateway.java
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
}
TaskExecutor.java
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
try {
... ...
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
... ...
if (taskAdded) {
task.startTaskThread();
... ...
}
... ...
}
Task.java
public void startTaskThread() {
executingThread.start();
}
接下來啟動Task執行執行緒,呼叫Task.run()-> doRun()
private void doRun() {
... ...
// now load and instantiatethe task's invokable code
// 加載和實體化task的可執行代碼
invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(),nameOfInvokableClass, env);
... ...
// run the invokable
// 執行代碼
invokable.invoke();
... ...
}
這里的invokable即為operator物件實體,通過反射創建,比如StreamTask,
nameOfInvokableClass在生成StreamGraph的時候,就已經確定了,見3.1.2
中的StreamGraph.addOperator方法:
public <IN,OUT> void addOperator(
Integer vertexID,
@Nullable StringslotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
Class<? extendsAbstractInvokable> invokableClass=
operatorFactory.isStreamSource()? SourceStreamTask.class : OneInputStreamTask.class;
addOperator(vertexID, slotSharingGroup,coLocationGroup, operatorFactory, inTypeInfo,
outTypeInfo,operatorName, invokableClass);
}
這里的OneInputStreamTask.class即為生成的StreamNode的vertexClass,這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass,然后當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,一直傳到Task中,
繼續看invokable.invoke():
StreamTask.java
public final void invoke() throws Exception {
try {
// 運?任務之前的準備?作
beforeInvoke();
... ...
// let the task do its work
// 關鍵邏輯:運行任務
runMailboxLoop();
... ...
// 運行任務之后的清理作業
afterInvoke();
}
... ...
cleanUpInvoke();
}
public void runMailboxLoop() throws Exception {
mailboxProcessor.runMailboxLoop();
}
MailboxProcessor.java
public void runMailboxLoop() throws Exception {
final TaskMailbox localMailbox =mailbox;
Preconditions.checkState(
localMailbox.isMailboxThread(),
"Method must be executedby declared mailbox thread!");
assert localMailbox.getState() ==TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxControllerdefaultActionContext = new MailboxController(this);
// 郵箱里有郵件,就進行處理,郵件就是類似于map之類的?任務,
while (isMailboxLoopRunning()) {
// The blocking `processMail`call will not return until default action is available.
processMail(localMailbox,false);
if (isMailboxLoopRunning()) {
mailboxDefaultAction.runDefaultAction(defaultActionContext); //lock is acquired inside default action as needed
}
}
}
runDefaultAction()執行默認操作,通過Control+h查找具體實作,為StreamTask.java中第292行
StreamTask.java
protected StreamTask(
Environment environment,
@Nullable TimerServicetimerService,
Thread.UncaughtExceptionHandleruncaughtExceptionHandler,
StreamTaskActionExecutoractionExecutor,
TaskMailbox mailbox) throwsException {
... ...
// 查看MailboxProcessor的構造器,第一個引數就是默認操作
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
... ...
}
MailboxProcessor.java查看構造器
public MailboxProcessor(
MailboxDefaultAction mailboxDefaultAction,
TaskMailbox mailbox,
StreamTaskActionExecutoractionExecutor) {
this.mailboxDefaultAction =Preconditions.checkNotNull(mailboxDefaultAction);
this.actionExecutor =Preconditions.checkNotNull(actionExecutor);
this.mailbox =Preconditions.checkNotNull(mailbox);
this.mailboxLoopRunning = true;
this.suspendedDefaultAction = null;
}
所以執行的默認操作就是processInput():
StreamTask.java
protected void processInput(MailboxDefaultAction.Controllercontroller) throws Exception {
InputStatus status = inputProcessor.processInput();
if (status ==InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT){
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture= getInputOutputJointFuture(status);
MailboxDefaultAction.SuspensionsuspendedDefaultAction = controller.suspendDefaultAction();
assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
}
StreamOneInputProcessor.java
public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT){
endOfInputAware.endInput(input.getInputIndex()+ 1);
}
return status;
}
StreamTaskNetworkInput.java
public InputStatus emitNext(DataOutput<T> output)throws Exception {
while (true) {
// get the stream element fromthe deserializer
if (currentRecordDeserializer!= null) {
DeserializationResultresult = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if(result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer= null;
}
if(result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(),output);
returnInputStatus.MORE_AVAILABLE;
}
}
Optional<BufferOrEvent>bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()){
// return to themailbox after receiving a checkpoint barrier to avoid processing of
// data after thebarrier before checkpoint is performed for unaligned checkpoint mode
if(bufferOrEvent.get().isBuffer()) {
processBuffer(bufferOrEvent.get());
} else {
processEvent(bufferOrEvent.get());
returnInputStatus.MORE_AVAILABLE;
}
} else {
if(checkpointedInputGate.isFinished()) {
checkState(checkpointedInputGate.getAvailableFuture().isDone(),"Finished BarrierHandler should be available");
returnInputStatus.END_OF_INPUT;
}
returnInputStatus.NOTHING_AVAILABLE;
}
}
}
private void processElement(StreamElement recordOrMark,DataOutput<T> output) throws Exception {
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()){
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(),lastChannel, output);
} else if(recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if(recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(),lastChannel, output);
} else {
throw newUnsupportedOperationException("Unknown type of StreamElement");
}
}
如果是map算子,emitRecord應該在OneInputStreamTask.java呼叫
public void emitRecord(StreamRecord<IN>record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
operator.processElement(record);
}
如果是map算子,processElement應該在StreamMap.java呼叫
public void processElement(StreamRecord<IN>element) throws Exception {
// userFunction.map() 就是用戶定義的MapFunction里的map方法
// 資料經過用戶定義的map算子,通過采集器往下游發送
output.collect(element.replace(userFunction.map(element.getValue())));
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/467016.html
標籤:其他
