watermark介紹
在Flink中,Watermark 是 Apache Flink 為了處理 EventTime 視窗計算提出的一種機制, 本質上是一種時間戳, 用來處理實時資料中的亂序問題的,通常是水位線和視窗結合使用來實作,
從設備生成實時流事件,到Flink的source,再到多個oparator處理資料,程序中會受到網路延遲、背壓等多種因素影響造成資料亂序,在進行視窗處理時,不可能無限期的等待延遲資料到達,當到達特定watermark時,認為在watermark之前的資料已經全部達到(即使后面還有延遲的資料), 可以觸發視窗計算,這個機制就是 Watermark(水位線),


如上圖:
● w(11): 表示11之前的資料到已經到達,11之前的資料可以進行計算了,
● w(20): 表示20之前的資料到已經到達,20之前的資料可以進行計算了,
watermark的使用
生成時機
watermark可以在接收到DataSource的資料后,立刻生成Watermark,也可以在DataSource后,使用map或者filter操作后再生成watermark,
水位線生產的最佳位置是在盡可能靠近資料源的地方,因為水位線生成時會做出一些有關元素順序相對時間戳的假設,由于資料源讀取程序是并行的,一切引起Flink跨行資料流磁區進行重新分發的操作(比如:改變并行度,keyby等)都會導致元素時間戳亂序,但是如果是某些初始化的filter、map等不會引起元素重新分發的操作,所以是可以考慮在生成水位線之前使用,
watermark的計算
watermark = 進入 Flink 視窗的最大的事件時間(maxEventTime) — 指定的延遲時間(t)
生成方式
第一種:With Periodic Watermarks
這個是周期性觸發Waterrmark的生成和發送,
周期性分配水位線在程式中會比較常用,是我們會指示系統以固定的時間間隔發出的水位線,
在設定時間為事件時間時,會默認設定這個時間間隔為200ms, 如果需要調整可以自行設定,
設定任務時間型別和
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設定時間使用事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定并行度為1
env.setParallelism(1)
//設定自動周期性的產生watermark,默認值為200毫秒
env.getConfig.setAutoWatermarkInterval(1000)
設定水位線watermark的值
//通過本地socket埠獲取資料
val dataStream = env.socketTextStream("127.0.0.1",10010)
//對資料的資料進行轉換為tuple2的格式
val tupStream = dataStream.map(line => {
val arr = line.split(" ")
(arr(0),arr(1).toLong)
})
//設定水位線
val waterDataStream = tupStream.assignTimestampsAndWatermarks(
//設定時間最低延遲
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
//設定時間戳
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String,Long]] {
//當前最大的值
var currentMaxNum = 0l
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(t: Tuple2[String,Long], recordTimesstamp: Long): Long = {
val eTime = t._2
currentMaxNum = Math.max(eTime,currentMaxNum)
//當前最大的值減去 允許亂序的資料,即為現在的水位線值,
//注意:這些代碼只是為了本地觀察方便,正常開發中是不需這樣寫的,
val waterMark = currentMaxNum - 2000;
println("資料:"+t.toString()+" ,"+sdf.format(eTime)+" , 當前watermark: "+sdf.format(waterMark))
eTime
}
})
)
//對資料進行計算和輸出
waterDataStream.keyBy(_._1).timeWindow(Time.seconds(3)).reduce((e1, e2)=>{
(e1._1,e1._2+e2._2)
}).print()
輸入和輸出:
--------------------輸入
s3 1639100010955
s2 1639100009955
s1 1639100008955
s0 1639100007955
s4 1639100011955
s5 1639100012955
s6 1639100013955
s7 1639100016955
--------------------輸出
資料:(s3,1639100010955) ,2021-12-10 09:33:30 , 當前watermark: 2021-12-10 09:33:28
資料:(s2,1639100009955) ,2021-12-10 09:33:29 , 當前watermark: 2021-12-10 09:33:28
資料:(s1,1639100008955) ,2021-12-10 09:33:28 , 當前watermark: 2021-12-10 09:33:28
資料:(s0,1639100007955) ,2021-12-10 09:33:27 , 當前watermark: 2021-12-10 09:33:28
資料:(s4,1639100011955) ,2021-12-10 09:33:31 , 當前watermark: 2021-12-10 09:33:29
資料:(s5,1639100012955) ,2021-12-10 09:33:32 , 當前watermark: 2021-12-10 09:33:30
(s2,1639100009955)
(s0,1639100007955)
(s1,1639100008955)
資料:(s6,1639100013955) ,2021-12-10 09:33:33 , 當前watermark: 2021-12-10 09:33:31
資料:(s7,1639100016955) ,2021-12-10 09:33:36 , 當前watermark: 2021-12-10 09:33:34
(s3,1639100010955)
(s5,1639100012955)
(s4,1639100011955)
說明:
- 在使用timeWindow的時候,會根據設定的視窗大小 3,將一分鐘內的視窗劃分為:
0-2,3-5,6-8,9-11,12-14,15-17,18-20,21-23,24-26,27-29,30-32,33-35… - watermark的值是當前輸入資料中最大時間戳-去亂序時間, 在watermark前的資料才會被認定是正常的,可供window進行計算的資料,
- 上面程式中輸入s3-s4時,watermark為的秒數是28和29,是在 timewindow劃分的時間視窗 27-29 中,所以沒有觸發計算,直到輸入s5,此時watermark秒數是30,在另一個視窗 30-32的視窗中,才會觸發 27-29視窗的計算,所以才輸出 s2,s0,s1的值,
- 同理到s7的時候,又是另一個視窗33-35,所以觸發上一個視窗的計算,
第二種: With Punctuated Watermarks
定點水位線(標記水位線)不是太常用,主要為輸入流中包含一些用于指示系統進度的特殊元組和標記,方便根據輸入元素生成水位線的場景使用的,
由于資料流中每一個遞增的EventTime都會產生一個Watermark,
在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成,
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// onEvent 中已經實作
}
}
延遲資料的處理方式
針對延遲太久的資料有3中處理方案:
- 丟棄(默認)
- allowedLateness: 指定允許資料延遲的時間
- sideOutputLateData: 收集遲到的資料
-
對于遲到太久的資料默認是丟棄的, 不會觸發window,因為輸入的資料所在的視窗已經執行過了,Flink對這些遲到資料執行的方案就是丟棄,
-
如果遲到不久,輸入的資料所在的視窗還未執行,是不會丟棄的, 這個要看視窗大小和最大允許的資料亂序時間,
附上 Flink官方檔案地址:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379501.html
標籤:其他
