Apache Flink是一個在無界和有界資料流上進行有狀態計算的框架,Flink提供了不同抽象級別的多個API,并為常見用例提供了專用庫,
在這里,我們介紹Flink易于使用且富有表現力的API和庫,
流媒體應用的構建塊
流處理框架可以構建和執行的應用程式型別取決于該框架對流、狀態和時間的控制程度,在下面,我們將描述流處理應用程式的這些構建塊,并解釋Flink處理它們的方法,
Streams
顯然,流是流處理的一個基本方面,然而,流可以具有不同的特性,這些特性會影響流的處理方式,Flink是一個通用的處理框架,可以處理任何型別的流,
- 有界和無界流:流可以是無界或有界的,即固定大小的資料集,Flink具有處理無限流的復雜功能,但也有專門的操作員來高效處理有界流,
- 實時和記錄流:所有資料都以流的形式生成,有兩種方法來處理資料,在生成流時實時處理它,或將流持久化到存盤系統(例如,檔案系統或物件存盤),然后進行處理,Flink應用程式可以處理記錄或實時流,
state
每個非平凡的流應用程式都是有狀態的,也就是說,只有對單個事件應用轉換的應用程式不需要狀態,任何運行基本業務邏輯的應用程式都需要記住事件或中間結果,以便在稍后的時間點訪問它們,例如,當接收到下一個事件時或在特定的持續時間之后,
應用狀態是Flink中最重要的一個特征,通過查看Flink在狀態處理背景關系中提供的所有特性,可以看出這一點,
- Flink為不同的資料結構(如原子值、串列或映射)提供狀態原語,開發人員可以根據函式的訪問模式選擇最有效的狀態原語,
- 可插拔狀態后端:應用程式狀態由可插拔狀態后端管理和檢查,Flink具有不同的狀態后端,將狀態存盤在記憶體或RocksDB中,RocksDB是一種高效的嵌入式磁盤資料存盤,自定義狀態后端也可以插入,
- 精確一次狀態一致性:Flink的檢查點和恢復演算法保證了在出現故障時應用程式狀態的一致性,因此,故障的處理是透明的,不會影回應用程式的正確性,
- 超大狀態:由于其異步和增量檢查點演算法,Flink能夠保持數TB大小的應用程式狀態,
- 可擴展應用程式:Flink通過將狀態重新分配給更多或更少的作業人員,支持有狀態應用程式的擴展,
時間
時間是流媒體應用程式的另一個重要組成部分,大多數事件流都有內在的時間語意,因為每個事件都是在特定的時間點生成的,此外,許多常見的流計算都是基于時間的,例如windows聚合、會話、模式檢測和基于時間的連接,流處理的一個重要方面是應用程式如何測量時間,即事件時間和處理時間的差異,
Flink提供了一系列豐富的與時間相關的功能,
事件時間模式:使用事件時間語意處理流的應用程式根據事件的時間戳計算結果,因此,無論是處理記錄的還是實時的事件,事件時間處理都允許獲得準確且一致的結果,
水印支持:Flink在事件時間應用程式中使用水印來推理時間,水印也是一種靈活的機制,可以權衡結果的延遲和完整性,
延遲資料處理:在使用水印以事件時間模式處理流時,可能會發生在所有相關事件到達之前計算已經完成的情況,這種事件稱為遲發事件,Flink提供了多個選項來處理延遲事件,例如通過側輸出重新路由事件,以及更新之前完成的結果,
處理時間模式:除了事件時間模式外,Flink還支持處理時間語意,該語意執行由處理器的掛鐘時間觸發的計算,處理時間模式可以適用于某些具有嚴格低延遲要求的應用程式,這些應用程式可以容忍近似的結果,
分層 API
Flink 根據抽象程度分層,提供了三種不同的 API,每一種 API 在簡潔性和表達力上有著不同的側重,并且針對不同的應用場景,
下文中,我們將簡要描述每一種 API 及其應用,并提供相關的代碼示例,
/**
* 將相鄰的 keyed START 和 END 事件相匹配并計算兩者的時間間隔 * 輸入資料為 Tuple2<String, String> 型別,第一個欄位為 key 值, * 第二個欄位標記 START 和 END 事件, */public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<Long> startTime;
@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}
/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}
/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {
// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}}
這個例子充分展現了 KeyedProcessFunction 強大的表達力,也因此是一個實作相當復雜的介面,
DataStream API
DataStream API 為許多通用的流處理操作提供了處理原語,這些操作包括視窗、逐條記錄的轉換操作,在處理事件時進行外部資料庫查詢等,DataStream API 支持 Java 和 Scala 語言,預先定義了例如map()、reduce()、aggregate() 等函式,你可以通過擴展實作預定義介面或使用 Java、Scala 的 lambda 運算式實作自定義的函式,
下面的代碼示例展示了如何捕獲會話時間范圍內所有的點擊流事件,并對每一次會話的點擊量進行計數,
// 網站點擊 Click 的資料流DataStream<Click> clicks =
DataStream<Tuple2<String, Long>> result = clicks
// 將網站點擊映射為 (userId, 1) 以便計數
.map(
// 實作 MapFunction 介面定義函式
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 以 userId (field 0) 作為 key
.keyBy(0)
// 定義 30 分鐘超時的會話視窗
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 對每個會話視窗的點擊進行計數,使用 lambda 運算式定義 reduce 函式
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
SQL & Table API
Flink 支持兩種關系型的 API,Table API 和 SQL,這兩個 API 都是批處理和流處理統一的 API,這意味著在無邊界的實時資料流和有邊界的歷史記錄資料流上,關系型 API 會以相同的語意執行查詢,并產生相同的結果,Table API 和 SQL 借助了 Apache Calcite 來進行查詢的決議,校驗以及優化,它們可以與 DataStream 和 DataSet API 無縫集成,并支持用戶自定義的標量函式,聚合函式以及表值函式,
Flink 的關系型 API 旨在簡化資料分析、資料流水線和 ETL 應用的定義,
下面的代碼示例展示了如何使用 SQL 陳述句查詢捕獲會話時間范圍內所有的點擊流事件,并對每一次會話的點擊量進行計數,此示例與上述 DataStream API 中的示例有著相同的邏輯,
SELECT userId, COUNT(*)FROM clicksGROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
庫
Flink 具有數個適用于常見資料處理應用場景的擴展庫,這些庫通常嵌入在 API 中,且并不完全獨立于其它 API,它們也因此可以受益于 API 的所有特性,并與其他庫集成,
- 復雜事件處理(CEP):模式檢測是事件流處理中的一個非常常見的用例,Flink 的 CEP 庫提供了 API,使用戶能夠以例如正則運算式或狀態機的方式指定事件模式,CEP 庫與 Flink 的 DataStream API 集成,以便在 DataStream 上評估模式,CEP 庫的應用包括網路入侵檢測,業務流程監控和欺詐檢測,
- DataSet API:DataSet API 是 Flink 用于批處理應用程式的核心 API,DataSet API 所提供的基礎算子包括map、reduce、(outer) join、co-group、iterate等,所有算子都有相應的演算法和資料結構支持,對記憶體中的序列化資料進行操作,如果資料大小超過預留記憶體,則過量資料將存盤到磁盤,Flink 的 DataSet API 的資料處理演算法借鑒了傳統資料庫演算法的實作,例如混合散列連接(hybrid hash-join)和外部歸并排序(external merge-sort),
- Gelly: Gelly 是一個可擴展的圖形處理和分析庫,Gelly 是在 DataSet API 之上實作的,并與 DataSet API 集成,因此,它能夠受益于其可擴展且健壯的運算子,Gelly 提供了內置演算法,如 label propagation、triangle enumeration 和 page rank 演算法,也提供了一個簡化自定義圖演算法實作的 Graph API,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/458480.html
標籤:Java
上一篇:使用RocketMQ消費訊息
