本篇我們將使用Java語言來實作Flink的單詞統計,
代碼開發
環境準備
匯入Flink 1.9 pom依賴
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> </dependencies>
構建Flink流處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
自定義source
每秒生成一行文本
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒發送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } });
單詞計算
// 3. 單詞統計 // 3.1 將文本行切分成一個個的單詞 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 將單詞轉換為一個個的元組 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照單詞進行分組 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 對每組單詞數量進行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print();
參考代碼
public class WordCount { public static void main(String[] args) throws Exception { // 1. 構建Flink流式初始化環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 自定義source - 每秒發送一行文本 DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒發送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } }); // 3. 單詞統計 // 3.1 將文本行切分成一個個的單詞 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 將單詞轉換為一個個的元組 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照單詞進行分組 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 對每組單詞數量進行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print(); env.execute("app"); } }
Flink對Java Lambda運算式支持情況
Flink支持Java API所有運算子使用Lambda運算式,但是,但Lambda運算式使用Java泛型時,就需要宣告型別資訊,
我們來看下上述的這段代碼:
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分單詞 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING);
之所以這里將所有的型別資訊,因為Flink無法正確自動推斷出來Collector中帶的泛型,我們來看一下FlatMapFuntion的源代碼
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}
我們發現 flatMap的第二個引數是Collector<O>,是一個帶引數的泛型,Java編譯器編譯該代碼時會進行引數型別擦除,所以Java編譯器會變成成:
void flatMap(T value, Collector out)
這種情況,Flink將無法自動推斷型別資訊,如果我們沒有顯示地提供型別資訊,將會出現以下錯誤:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
這種情況下,必須要顯示指定型別資訊,否則輸出將回傳值視為Object型別,這將導致Flink無法正確序列化,
所以,我們需要顯示地指定Lambda運算式的引數型別資訊,并通過returns方法顯示指定輸出的型別資訊
我們再看一段代碼:
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT));
為什么map后面也需要指定型別呢?
因為此處map回傳的是Tuple2型別,Tuple2是帶有泛型引數,在編譯的時候同樣會被查出泛型引數資訊,導致Flink無法正確推斷,
更多關于對Java Lambda運算式的支持請參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/32512.html
標籤:大數據
