無知不可怕,毀掉自己的是驕傲
1,示例
很多人使用Flink的時候有沒有考慮過執行計劃是如何生成的,例如Spark的RDD拓撲有向無環圖是怎么生成的,列印出來的執行計劃應該怎么理解,我們先看一個示例,執行以下System.out.println(env.getExecutionPlan());
{
"nodes" : [ {
"id" : 1, 圖節點ID,也就是transform的ID
"type" : "Source: 添加了一個source", 這個就是圖名稱
"pact" : "Data Source", 型別,資料源
"contents" : "Source: 添加了一個source", 描述內容
"parallelism" : 1 并行度
}, {
"id" : 2,
"type" : "Flat Map",
"pact" : "Operator",
"contents" : "Flat Map",
"parallelism" : 8,
"predecessors" : [ { 這個是源節點
"id" : 1,
"ship_strategy" : "REBALANCE", 策略
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Keyed Aggregation",
"pact" : "Operator",
"contents" : "Keyed Aggregation",
"parallelism" : 8,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 8,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
2,代碼決議
public String getExecutionPlan() {
return getStreamGraph(getJobName(), false).getStreamingPlanAsJSON();
}
先看如下這個方法,首先一個執行計劃需要去執行生成有向無環圖(類似于Spark,Hive,Flink的資料血緣可以借鑒以下他們的方法哦),獲取到StreamGrap物件之后get得到一個json串就是我們的執行計劃
3,代碼下鉆
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
//這個方法就是包裝了一個步驟,主要把if模塊包裝了以下,用來做transformations的清理
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
//呼叫該方法生成一個流圖生成器
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) { //校驗是否有算子,這個跟上一章source,Sink,transform掛載有關,有興趣的可以再去看下
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}
final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);
//這里設定了一大堆屬性,有興趣的可以全部進去一個一個屬性了解一下;
return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
//重點來了,真正生成執行計劃圖的方法
public StreamGraph generate() {
//這里直接new了一個streamGrap物件
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
//這里獲取了一下是否已批module運行的判斷
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
//添加一些設定
configureStreamGraph(streamGraph);
//創建一個hashMap物件,主要是之后要用遞回由這個物件來終止
alreadyTransformed = new HashMap<>();
//這里就開始生成校驗算子了
for (Transformation<?> transformation : transformations) {
transform(transformation); //重點關注一下這里的代碼,主要是根據這個代碼來添加屬性的
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
private Collection<Integer> transform(Transformation<?> transform) {
//前面創建了一個map,主要是校驗算子是否已經處理過
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
//列印出沒有加入的算子
LOG.debug("Transforming " + transform);
//獲取算子的并行度,如果并行度沒有設定就進入if判斷
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();//這就是做個校驗;
@SuppressWarnings("unchecked")
//這個方法就是根據當前的transformation獲取真正的執行
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>)
translatorMap.get(transform.getClass());
//如果能夠獲取到
Collection<Integer> transformedIds;
if (translator != null) {
//跳轉一下這個方法,處理圖轉換的;
transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
return transformedIds;
}
private Collection<Integer> translate(
final TransformationTranslator<?, Transformation<?>> translator,
final Transformation<?> transform) {
checkNotNull(translator);
checkNotNull(transform);
//這里采用了一個遞回的方式來進行節點添加,如果父類未被轉換,則會轉換父類,比如第一個transformation是flat_map,它的父類source就沒被轉換,
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
// the recursive call might have already transformed this 將其注冊過的算子添加到其中
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 獲取slot的資源組名
final String slotSharingGroup =
determineSlotSharingGroup(
transform.getSlotSharingGroup(),
allInputIds.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
final TransformationTranslator.Context context =
new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context);
}
private List<Collection<Integer>> getParentInputIds(
@Nullable final Collection<Transformation<?>> parentTransformations) {
final List<Collection<Integer>> allInputIds = new ArrayList<>();
if (parentTransformations == null) {
return allInputIds;
}
for (Transformation<?> transformation : parentTransformations) {
//這個方法的重點是這里哦,遞回呼叫transform方法;
allInputIds.add(transform(transformation));
}
return allInputIds;
}
//呼叫該方法添加Node
protected Collection<Integer> translateInternal(
final Transformation<OUT> transformation,
final StreamOperatorFactory<OUT> operatorFactory,
final TypeInformation<IN> inputType,
@Nullable final KeySelector<IN, ?> stateKeySelector,
@Nullable final TypeInformation<?> stateKeyType,
final Context context) {
checkNotNull(transformation);
checkNotNull(operatorFactory);
checkNotNull(inputType);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
if (stateKeySelector != null) {
TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
}
int parallelism =
transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
streamGraph.addEdge(inputId, transformationId, 0);
}
return Collections.singleton(transformationId);
}
private <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName,
Class<? extends AbstractInvokable> invokableClass) {
addNode(
vertexID,
slotSharingGroup,
coLocationGroup,
invokableClass,
operatorFactory,
operatorName);
setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
以上方法就操作完了,獲取的物件內容如下,然后呼叫getStreamingPlanAsJSON()方法來獲取最后的JSON字串
public String getJSON() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json.toPrettyString();
}
到此為止Flink的執行計劃就生成完成了,我們可以通過https://flink.apache.org/visualizer/來進行執行計劃的決議,看一下自己的鏈路,作為一個合格的大資料開發者,我們一定要懂得去看執行計劃,無論是Spark,Hive,Flink都是包含執行計劃,通過執行計劃我們可以看自己的SQL亦或者API寫的是否良好;
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/283061.html
標籤:其他
上一篇:我的Serverless實戰—基于Serverless搭建一個簡單的WordPress個人博客圖文詳解-JJZ
下一篇:【如何3秒鐘看出一個人的python實力|Python 資料分析打怪升級之路 day04】:手把手教你如何分析用戶資料、資料分析基本概念
