Flink原始碼閱讀
- 無知不可怕,驕傲才可怕
- 1 WordCount代碼
- 2,Source
- 3,TransFormation
- 4,Sink
- 總結
無知不可怕,驕傲才可怕
1 WordCount代碼
package org.apache.flink.streaming.examples.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
public class WordCount {
// *************************************************************************
// 先大致講述一下代碼,必須要有一個思路的轉換,Flink與Spark一致,在沒有執行executor之前,是不會執行的
// 相當于是一個組態檔,最后執行了一下Start;
// *************************************************************************
public static void main(String[] args) throws Exception {
// Checking input parameters 這個工具類是Flink獨有的,不過多講解,非常好用簡單
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment 創建全域應用環境背景關系,類似于SparkContext和SpringBoot的Context
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 將決議的引數加入conf
env.getConfig().setGlobalJobParameters(params);
// 添加一個Source,Flink總共有三大件Source,Transform,Sink
DataStream<String> text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataStream should not be null.");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}
// 添加一個Transform,很簡單,類似于Spark
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1);
// 添加一個Sink
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount"); //這里這個引數指的是Flink的JOBName,展示在Web頁面的
System.out.println(env.getExecutionPlan()); 列印執行計劃
}
// *************************************************************************
// USER FUNCTIONS Flink支持用戶創建類實作的方式來完成函式定義
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
2,Source
以text = env.fromElements(WordCountData.WORDS)為例;
原始碼里是這樣說的,創建一個有限的資料流,資料必須是同一型別,例如全部是Integer或者String型別,Flink將嘗試從Data中獲取型別
@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
//判斷資料是否為空,這個簡單不需要說那么多
if (data.length == 0) {
throw new IllegalArgumentException(
"fromElements needs at least one element as argument");
}
//創建一個TypeInfo
TypeInformation<OUT> typeInfo;
try {
//根據第一條資料獲取其資料型別,這里我的資料第一條是String,我們可以自己指定型別
typeInfo = TypeExtractor.getForObject(data[0]);
} catch (Exception e) {
throw new RuntimeException(
"Could not create TypeInformation for type "
+ data[0].getClass().getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)",
e);
}
//這里我們看到呼叫了fromCollection,傳入了資料和資料型別
return fromCollection(Arrays.asList(data), typeInfo);
}
該方法意思是從給定的非空集合創建資料流
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null"); //判斷資料是否為空,如果為空拋出例外
// must not have null elements and mixed elements 必須是非空的和相同型別的元素
FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
// 創建一個SourceFunction,這里很簡單了,Flink三大件第一步,沒這步后面都是扯淡
SourceFunction<OUT> function;
try {
//new Function,函式里面就不說了,有興趣的可以去看一下
function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
//該Function為SourceFunction,我們將它注冊到Source中去,這里我們注意一下
//如果我們之后呼叫該方法,該方法的并行度將永遠是1
return addSource(function, "添加了一個source", typeInfo, Boundedness.BOUNDED)
.setParallelism(1);
}
private <OUT> DataStreamSource<OUT> addSource(
final SourceFunction<OUT> function,
final String sourceName,
@Nullable final TypeInformation<OUT> typeInfo,
final Boundedness boundedness) {
checkNotNull(function);
checkNotNull(sourceName);
checkNotNull(boundedness);
//獲取資料型別
TypeInformation<OUT> resolvedTypeInfo =
getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
//判斷該資料流是否并行
boolean isParallel = function instanceof ParallelSourceFunction;
//清除一些不必要的閉包,Spark也有該操作
clean(function);
//創建一個StreamSource流
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
//創建一個DataStreamSource物件,繼續往下走
return new DataStreamSource<>(
this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}
public DataStreamSource(
StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo,
StreamSource<T, ?> operator,
boolean isParallel,
String sourceName,
Boundedness boundedness) {
super( //直接呼叫父類的構造方法,創一個物件,將env和LegacySourceTransformation物件傳入
//至此我們的DataStreamSource創建完畢,Souce篇到此結束
//這里注意一下transformation物件,后面有用
environment,
new LegacySourceTransformation<>(
sourceName,
operator,
outTypeInfo,
environment.getParallelism(),
boundedness));
this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}
3,TransFormation
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
//決議資料型別,同上;
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true);
//呼叫flatMap方法回傳的物件,繼續深入
return flatMap(flatMapper, outType);
}
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
//呼叫transform方法,完成物件回傳
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
//呼叫transform方法,完成物件回傳
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
//中間略過一個方法,太簡單了,所以直接過
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();//這里就是看一下有沒有source
//創建一個transformation物件,標識為一個transformation
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
//創建結果流
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
//添加轉換鏈條
getExecutionEnvironment().addOperator(resultTransform);
//回傳流
return returnStream;
}
這里有一個點注意一下,不是所有的transformation都會被加入,比如默認的keyby算子,也就是分組算子,我們看下效果;
KeyedStream(
DataStream<T> stream,
PartitionTransformation<T> partitionTransformation,
KeySelector<T, KEY> keySelector,
TypeInformation<KEY> keyType) {
//可以看出keyby并沒有被加入transform中,那究竟是為什么呢
super(stream.getExecutionEnvironment(), partitionTransformation);
this.keySelector = clean(keySelector);
this.keyType = validateKeyType(keyType);
}
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
//其實是因為將keyby組合成了reduceTransfrom,也就是常說的flink簡單優化了流程鏈路,這里真是一個小tips
//不過也可以通過設定并行度強行將鎖鏈打開,不過作用不大,Transfrom到此結束
ReduceTransformation<T, KEY> reduce =
new ReduceTransformation<>(
"Keyed Reduce",
environment.getParallelism(),
transformation,
clean(reducer),
keySelector,
getKeyType());
getExecutionEnvironment().addOperator(reduce);
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), reduce);
}
4,Sink
@PublicEvolving
public DataStreamSink<T> print() {
//可以看到和Source區別不大有木有;
//往里深入一下
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed 略過該方法講解,作用不大
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
//創建Sink
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
//通過StreamSink得到資料流輸出
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
//插入鏈路,這個也是有原因的,具體原因我想懂flink的人都懂
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
總結
說實話,前段時間過的特別黑暗,渾渾噩噩,最近終于能靜下心來好好閱讀原始碼,好好學習,以上流程就是Flink的
Source
Transformation
Sink
鏈路生成流程,可以看到,并沒有執行,資料根本不流動,所以可以把它們看做是一個插件,Spark也是如此,相當于是組態檔,Flink學會之后一定要靜下心來總結,開始探索底層,說實話Spark其實探索了百分之十左右到現在的放棄以及徹底忘了scala咋寫告誡了我一定要每日要學習,復習,技術猶如逆水行舟,不進則退,所以一定要努力學習
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/282891.html
標籤:其他
上一篇:一張圖搞懂華為介面型別!
下一篇:Hive入門(二)
