摘要:在深入了解 Flink 實時資料處理程式的開發之前,先通過一個簡單示例來了解使用 Flink 的 DataStream API 構建有狀態流應用程式的程序,
本文分享自華為云社區《Flink 實體:Flink 流處理程式編程模型》,作者:TiAmoZhang ,
在深入了解 Flink 實時資料處理程式的開發之前,先通過一個簡單示例來了解使用 Flink 的 DataStream API 構建有狀態流應用程式的程序,
01、流資料型別
Flink 以一種獨特的方式處理資料型別和序列化,它包含自己的型別描述符、泛型型別提取和型別序列化框架,基于 Java 和 Scala 語言,Flink 實作了一套自己的一套型別系統,它支持很多種類的型別,包括
- 基本型別,
- 陣列型別,
- 復合型別,
- 輔助型別,
- 通用型別,
詳細的 Flink 型別系統如圖 1 所示,
■ 圖 1 Flink 型別系統
Flink 針對 Java 和 Scala 的 DataStream API 要求流資料的內容必須是可序列化的,Flink 內置了以下型別資料的序列化器:
- 基本資料型別:String、Long、Integer、Boolean、Array,
- 復合資料型別:Tuple、POJO、Scala case class,
對于其他型別,Flink 會回傳 Kryo,也可以在 Flink 中使用其他序列化器,Avro 尤其得到了很好的支持,
1.java DataStream API 使用的流資料型別
對于 Java API,Flink 定義了自己的 Tuple1 到 Tuple25 型別來表示元組型別,代碼如下:
Tuple2<String, Integer> person = new Tuple2<>("王老五", 35); //索引基于0 String name = person.f0; Integer age = person.f1;
在 Java 中,POJO(plain old Java Object)是這樣的 Java 類:
- 有一個無參的默認構造器,
- 所有的欄位要么是 public 的,要么有一個默認的 getter 和 setter,
例如,定義一個名為 Person 的 POJO 類,代碼如下:
//定義一個Person POJO類public class Person{ public String name; public Integer age; public Person() {}; public Person(String name, Integer age) { this.name = name; this.age = age; };} //創建一個實體Person person = new Person("王老五", 35);
2.Scala DataStream API 使用的流資料型別
對于元組,使用 Scala 自己的 Tuple 型別就好,代碼如下:
val person = ("王老五", 35) //索引基于1val name = person._1val age = person._2
對于物件型別,使用 case class(相當于 Java 中的 JavaBean),代碼如下:
case class Person(name: String, age:Int) val person = Person("王老五", 35)
3.Flink 型別系統
對于創建的任意一個 POJO 型別,看起來它是一個普通的 Java Bean,在 Java 中,可以使用 Class 來描述該型別,但其實在 Flink 引擎中,它被描述為 PojoTypeInfo,而 PojoTypeInfo 是 TypeInformation 的子類,
TypeInformation 是 Flink 型別系統的核心類,Flink 使用 TypeInformation 來描述所有 Flink 支持的資料型別,就像 Java 中的 Class 型別一樣,每種 Flink 支持的資料型別都對應的是 TypeInformation 的子類,例如 POJO 型別對應的是 PojoTypeInfo、基礎資料型別陣列對應的是 BasicArrayTypeInfo、Map 型別對應的是 MapTypeInfo、值型別對應的是 ValueTypeInfo,
除了對型別的描述,TypeInformation 還提供了序列化的支持,在 TypeInformation 中有一種方法:createSerializer 方法,它用來創建序列化器,序列化器中定義了一系列的方法,其中,通過 serialize 和 deserialize 方法,可以將指定型別進行序列化,并且 Flink 的這些序列化器會以稠密的方式來將物件寫入記憶體中,Flink 中也提供了非常豐富的序列化器,在我們基于 Flink 型別系統支持的資料型別進行編程時,Flink 在運行時會推斷出資料型別的資訊,我們在基于 Flink 編程時,幾乎是不需要關心型別和序列化的,
4.型別與 Lambda 運算式支持
在編譯時,編譯器能夠從 Java 源代碼中讀取完整的型別資訊,并強制執行型別的約束,但生成 class 位元組碼時,會將引數化型別資訊洗掉,這就是型別擦除,型別擦除可以確保不會為泛型創建新的 Java 類,泛型是不會產生額外的開銷的,也就是說,泛型只是在編譯器編譯時能夠理解該型別,但編譯后執行時,泛型是會被擦除掉的,
為了全球說明,請看下面的代碼:
public static <T> boolean hasItems(T [] items, T item){ for (T i : items){ if(i.equals(item)){ return true; } } return false;}
以上是一段 Java 的泛型方法,但在編譯后,編譯器會將未系結型別的 T 擦除掉,替換為 Object,也就是編譯之后的代碼如下:
public static Object boolean hasItems(Object [] items, Object item){ for (Object i : items){ if(i.equals(item)){ return true; } } return false;}
泛型只是能夠防止在運行時出現型別錯誤,但運行時會出現以下例外,而且 Flink 以非常友好的方式提示:
could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
就是因為 Java 編譯器型別擦除的原因,所以 Flink 根本無法推斷出來算子(例如 flatMap)要輸出的型別是什么,所以在 Flink 中使用 Lambda 運算式時,為了防止因型別擦除而出現運行時錯誤,需要指定 TypeInformation 或者 TypeHint,
創建 TypeInformation,代碼如下:
.returns(TypeInformation.of(String.class))
創建 TypeHint,代碼如下:
.returns(new TypeHint<String>() {})
02、流應用程式實作
Flink 程式的基本構建塊是 stream 和 transformation(流和轉換),從概念上講,stream 是資料記錄的流(可能永遠不會結束),transformation 是一個運算,它接受一個或多個流作為輸入,經過處理/計算后生成一個或多個輸出流,
下面實作一個完整的、可作業的 Flink 流應用程式示例,
【示例 1】將有關人員的記錄流作為輸入,并從中篩選出未成年人資訊,
Scala 代碼如下:
(1) 在 IntelliJ IDEA 中創建一個 Flink 專案,使用 flink-quickstart-scala 專案模板
(2) 設定依賴,在 pom.xml 檔案中添加如下依賴內容:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
(3) 創建主程式 StreamingJobDemo1,編輯流處理代碼如下:
import org.apache.flink.streaming.api.scala._ object StreamingJobDemo1 {//定義事件類 case class Person(name:String, age:Integer) def main(args: Array[String]) { //設定流執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //讀取資料源,構造資料流 val peoples = env.fromElements( Person("張三", 21), Person("李四", 16), Person("王老五", 35) ) //對資料流執行filter轉換 val adults = peoples.filter(_.age>18) //輸出結果 adults.print //執行 env.execute("Flink Streaming Job") }}
執行以上代碼,輸出結果如下:
7> Person(張三,21)1> Person(王老五,35)
Java 代碼如下:
(1) 在 IntelliJ IDEA 中創建一個 Flink 專案,使用 flink-quickstart-Java 專案模板
(2) 設定依賴,在 pom.xml 檔案中添加如下依賴內容: <dependency><groupId>org.apache.flink</groupId> <artifactId>flink-Java</artifactId> <version>1.13.2</version> <scope>provided</scope></dependency>dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-Java_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope></dependency> (3) 創建一個 POJO 類,用來表示流中的資料,代碼如下: //POJO類,表示人員資訊物體public class Person { public String name; //存盤姓名 public Integer age; //存盤年齡 //空構造器 public Person() {}; //構造器,初始化屬性 public Person(String name, Integer age) { this.name = name; this.age = age; }; //用于除錯時輸出資訊 public String toString() { return this.name.toString() + ": age " + this.age.toString(); };} (4) 打開專案中的 StreamingJob 物件檔案,編輯流處理代碼如下: import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.api.common.functions.FilterFunction; public class StreamingJobDemo1 { public static void main(String[] args) throws Exception { //獲得流執行環境 final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); //讀取資料源,構造DataStream DataStream<Person> personDS = env.fromElements( new Person("張三", 21), new Person("李四", 16), new Person("王老五", 35) ); //執行轉換運算(這里是過濾年齡不小于18歲的人)//注意,這里使用了匿名函式 DataStream<Person> adults = personDS.filter(new FilterFunction<Person>() { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } }); //將結果輸出到控制臺 adults.print(); //觸發流程式開始執行 env.execute("stream demo"); }}
(5) 執行以上程式,輸出結果如下,
張三: age 21王老五: age 35
注意
Flink 將批處理程式作為流程式的一種特殊情況執行,其中流是有界的(有限數量的元素),DataSet 在內部被視為資料流,因此,上述概念同樣適用于批處理程式,也適用于流程式,只有少數例外:
- 批處理程式的容錯不使用檢查點,錯誤恢復是通過完全重放流實作的,這使恢復的成本更高,但是因為它避免了檢查點,所以使常規處理更輕量,
- DataSet API 中的有狀態運算使用簡化的 in-memory/out-of-核資料結構,而不是 key-value 索引,
- DataSet API 引入了特殊的同步(基于 superstep)迭代,這只可能在有界流上實作,
03、流應用程式剖析
所有的 Flink 應用程式都以特定的步驟來作業,這些作業步驟如圖 2 所示,
■ 圖 2 Flink 應用程式作業步驟
也就是說,每個 Flink 程式都由相同的基本部分組成:
- 獲取一個執行環境,
- 加載/創建初始資料,
- 指定對該資料的轉換,
- 指定計算結果放在哪里,
- 觸發程式執行,
1.獲取一個執行環境
Flink 應用程式從其 main()方法中生成一個或多個 Flink 作業(job),這些作業可以在本地 JVM(LocalEnvironment)中執行,也可以在具有多臺機器的集群的遠程設定中執行(RemoteEnvironment),對于每個程式,ExecutionEnvironment 提供了控制作業執行(例如設定并行性或容錯/檢查點引數)和與外部環境互動(資料訪問)的方法,
每個 Flink 應用程式都需要一個執行環境(本例中為 env),流應用程式需要的執行環境使用的是 StreamExecutionEnvironment,為了開始撰寫 Flink 程式,用戶首先需要獲得一個現有的執行環境,如果沒有,就需要先創建一個,根據目的不同,Flink 支持以下幾種方式:
- 獲得一個已經存在的 Flink 環境,
- 創建本地環境,
- 創建遠程環境,
Flink 流程式的入口點是 StreamExecutionEnvironment 類的一個實體,它定義了程式執行的背景關系,StreamExecutionEnvironment 是所有 Flink 程式的基礎,可以通過一些靜態方法獲得一個 StreamExecutionEnvironment 的實體,代碼如下:
StreamExecutionEnvironment.getExecutionEnvironment()StreamExecutionEnvironment.createLocalEnvironment()StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
要獲得執行環境,通常只需呼叫 getExecutionEnvironment()方法,這將根據背景關系選擇正確的執行環境,如果正在 IDE 中的本地環境上執行,則它將啟動一個本地執行環境,如果是從程式中創建了一個 JAR 檔案,并通過命令列呼叫它,則 Flink 集群管理器將執行 main()方法,getExecutionEnvironment()將回傳用于在集群上以分布式方式執行程式的執行環境,
在上面的示例程式中,使用以下陳述句來獲得流程式的執行環境,
Scala 代碼如下:
//設定流執行環境val env = StreamExecutionEnvironment.getExecutionEnvironment
Java 代碼如下:
//獲得流執行環境final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment 包含 ExecutionConfig,可使用它為運行時設定特定于作業的配置值,例如,如果要設定自動水印發送間隔,可以像下面這樣在代碼進行配置,
Scala 代碼如下:
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.getConfig.setAutoWatermarkInterval(long milliseconds)
Java 代碼如下:
final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(long milliseconds);
2.加載/創建初始資料
執行環境可以從多種資料源讀取資料,包括文本檔案、CSV 檔案、Socket 套接字資料等,也可以使用自定義的資料輸入格式,例如,要將文本檔案讀取為行序列,代碼如下:
final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.readTextFile("file://path/to/file");
資料被逐行讀取記憶體后,Flink 會將它們組織到 DataStream 中,這是 Flink 中用來表示流資料的特殊類,
在示例程式【示例 1】中,使用 fromElements()方法讀取集合資料,并將讀取的資料存盤為 DataStream 型別,
Scala 代碼如下:
//讀取資料源,構造資料流val personDS = env.fromElements( Person("張三", 21), Person("李四", 16), Person("王老五", 35) )
Java 代碼如下:
//讀取資料源,構造DataStreamDataStream<Person> personDS = env.fromElements( new Person("張三", 21), new Person("李四", 16), new Person("王老五", 35));
3.對資料進行轉換
每個 Flink 程式都對分布式資料集合執行轉換,Flink 的 DataStream API 提供了多種資料轉換功能,包括過濾、映射、連接、分組和聚合,例如,下面是一個 map 轉換應用,通過將原始集合中的每個字串轉換為整數來創建一個新的 DataStream,代碼如下:
在示例程式【示例 1】中使用了 filter 過濾轉換,將原始資料集轉換為只包含成年人資訊的新 DataStream 流,代碼如下:
DataStream<String> input = env.fromElements("12","3","25","5","32","6"); DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); }});
Scala 代碼如下:
//對資料流執行filter轉換val adults = personDS.filter(_.age>18)
Java 代碼如下:
//對資料流執行filter轉換DataStream<Person> adults = flintstones.filter( new FilterFunction<Person>() { @Override public boolean filter(Person person) throws Exception { return person.age >= 18; } });
這里不必了解每個轉換的具體含義,后面我們會詳細介紹它們,需要強調的是,Flink 中的轉換是惰性的,在呼叫 sink 操作之前不會真正執行,
4.指定計算結果放在哪里
一旦有了包含最終結果的 DataStream,就可以通過創建接收器(sink)將其寫入外部系統,例如,將計算結果列印輸出到螢屏上,
Scala 代碼如下:
//輸出結果adults.print
Java 代碼如下:
//輸出結果adults.print();
Flink 中的接收器(sink)操作觸發流的執行,以生成程式所需的結果,例如將結果保存到檔案系統或將其列印到標準輸出,上面的示例使用 adults.print()將結果列印到任務管理器日志中(在 IDE 中運行時,任務管理器日志將顯示在 IDE 的控制臺中),這將對流的每個元素呼叫其 toString()方法,
5.觸發流程式執行
一旦寫好了程式處理邏輯,就需要通過呼叫 StreamExecutionEnvironment 上的 execute()來觸發程式執行,所有的 Flink 程式都是延遲執行的:當程式的主方法執行時,資料加載和轉換不會直接發生,而是創建每個運算并添加到程式的執行計劃中,當執行環境上的 execute()呼叫顯式觸發執行時,這些操作才實際上被執行,程式是在本地執行還是提交到集群中執行取決于 ExecutionEnvironment 的型別,
延遲計算可以讓用戶構建復雜的程式,然后 Flink 將其作為一個整體計劃的單元執行,在示例程式【示例 1】中,使用如下代碼來觸發流處理程式的執行,
Scala 代碼如下:
//觸發流程式執行env.execute("Flink Streaming Job") //引數是程式名稱,會顯示在Web UI界面上
Java 代碼如下:
//觸發流程式執行env.execute("Flink Streaming Job"); //引數是程式名稱,會顯示在Web UI界面上
在應用程式中執行的 DataStream API 呼叫將構建一個附加到 StreamExecutionEnvironment 的作業圖(Job Graph),呼叫 env.execute()時,此圖被打包并發送到 Flink Master,該 Master 并行化作業并將其片段分發給 TaskManagers 以供執行,作業的每個并行片段將在一個 task slot(任務槽)中執行,如圖 3 所示,
■圖 3 Flink 流應用程式執行原理
這個分布式運行時要求 Flink 應用程式是可序列化的,它還要求集群中的每個節點都可以使用所有依賴項,
StreamExecutionEnvironment 上的 execute()方法將等待作業完成,然后回傳一個 JobExecutionResult,其中包含執行時間和累加器結果,注意,如果不呼叫 execute(),應用程式將不會運行,
如果不想等待作業完成,可以通過呼叫 StreamExecutionEnvironment 上的 executeAysnc()來觸發異步作業執行,它將回傳一個 JobClient,可以使用它與剛才提交的作業進行通信,例如,下面的示例代碼演示了如何通過 executeAsync()實作 execute()的語意,
Scala 代碼如下:
val jobClient = evn.executeAsyncval jobExecutionResult =jobClient.getJobExecutionResult(userClassloader).get
Java 代碼如下:
final JobClient jobClient = env.executeAsync();final JobExecutionResult jobExecutionResult =jobClient.getJobExecutionResult(
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/554793.html
標籤:其他
上一篇:差分陣列詳解
下一篇:返回列表
