主頁 > 資料庫 > 大資料Hadoop之——Flink中的Window API+時間語意+Watermark

大資料Hadoop之——Flink中的Window API+時間語意+Watermark

2022-05-11 08:27:35 資料庫

目錄
  • 一、window 概念
  • 二、 時間視窗(Time Window)
    • 1)滾動視窗(Tumbling Windows)
    • 2)滑動視窗(Sliding Windows)
    • 3)會話視窗(Session Windows)
  • 三、window API
  • 四、視窗分配器(window assigner)
    • 1)增量聚合函式(incremental aggregation functions)
    • 2)全視窗函式(full window functions)
    • 3)其它可選window API
  • 五、Flink 中的時間語意
  • 六、設定 Event Time
  • 七、水位線(Watermark)
    • 1)為什么需要水位線(Watermark)
    • 2)如何利用Watermark處理亂序資料問題?
    • 3)watermark 的特點
    • 4)watermark 的傳遞
    • 5)watermark 策略與應用
    • 1)Watermark 策略簡介
    • 2)使用 Watermark 策略應用
    • 3)使用場景
    • 4)TimestampAssigner
      • 1、AssignerWithPeriodicWatermarks
      • 2、AssignerWithPunctuatedWatermarks
    • 5)WatermarkStrategy(重點)
      • 1、固定亂序長度策略(forBoundedOutOfOrderness)
      • 2、單調遞增策略(forMonotonousTimestamps)
      • 3、不生成策略(noWatermarks)

一、window 概念

視窗(window)是處理無限流的核心,視窗將流分割成有限大小的“桶”,我們可以在桶上應用計算,本檔案重點介紹如何在Flink中執行視窗操作,以及程式員如何從其提供的功能中獲得最大的好處,

一個有視窗的Flink程式的一般結構如下所示,第一個片段指的是鍵控流,而第二個片段指的是非鍵控流,可以看到,唯一的區別是keyBy(…)呼叫鍵流而window(…)呼叫非鍵流的windowwall(…),這也將作為頁面其余部分的路標,

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

一般真實的流都是無界的,怎樣處理無界的資料?

在自然環境中,資料的產生原本就是流式的,無論是來自 Web 服務器的事件資料,證券交易所的交易資料,還是來自工廠車間機器上的傳感器資料,其資料都是流式的,但是當你 分析資料時,可以圍繞 有界流(bounded)或 無界流(unbounded)兩種模型來組織處理資料,當然,選擇不同的模型,程式的執行和處理方式也都會不同,

上面圖片來源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/overview/

  • 可以把無限的資料流進行切分,得到有限的資料集進行處理 —— 也
    就是得到有界流
  • 視窗(window)就是將無限流切割為有限流的一種方式,它會將流
    資料分發到有限大小的桶(bucket)中進行分析

二、 時間視窗(Time Window)

官方檔案

1)滾動視窗(Tumbling Windows)

翻轉視窗賦值器將每個元素賦值給一個指定視窗大小的視窗,滾動的視窗有固定的尺寸,而且不重疊,例如,如果您指定一個大小為5分鐘的滾動視窗,則當前視窗將被評估,并每5分鐘啟動一個新視窗,如下圖所示:

【特點】

  • 將資料依據固定的視窗長度對資料進行切分
  • 時間對齊,視窗長度固定,沒有重疊

【示例代碼】

TumblingEventTimeWindows:滾動事件時間視窗
TumblingProcessingTimeWindows:滾動處理時間視窗

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

2)滑動視窗(Sliding Windows)

滑動視窗賦值器將元素賦值給固定長度的視窗,類似于滾動視窗賦值器,視窗的大小由視窗大小引數配置,另外一個視窗滑動引數控制滑動視窗啟動的頻率,因此,如果滑動視窗小于視窗大小,則滑動視窗可以重疊,在這種情況下,元素被分配給多個視窗,

例如,您可以將大小為10分鐘的視窗滑動5分鐘,這樣,每隔5分鐘就會出現一個視窗,其中包含在最后10分鐘內到達的事件,如下圖所示:

【特點】

  • 滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗
    長度和滑動間隔組成
  • 視窗長度固定,可以有重疊

【示例代碼】

SlidingEventTimeWindows:滑動事件時間視窗
SlidingProcessingTimeWindows:滑動處理時間視窗

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

3)會話視窗(Session Windows)

會話視窗分配器根據活動的會話對元素進行分組,與滑動視窗不同,會話視窗沒有重疊,也沒有固定的開始和結束時間,相反,當會話視窗在一段時間內沒有接收到元素時,即當一個不活動間隙發生時,會話視窗將關閉,會話視窗分配器可以配置一個靜態會話間隙,也可以配置一個會話間隙提取器函式,該函式定義了不活動的時間長度,當這段時間到期時,當前會話關閉,隨后的元素被分配到一個新的會話視窗,

【特點】

  • 由一系列事件組合一個指定時間長度的 timeout 間隙組成,也就是
    一段時間沒有接收到新資料就會生成新的視窗
  • 時間無對齊
  • 視窗長度不固定,也不會重疊

【示例代碼】

EventTimeSessionWindows:會話事件時間視窗
SlidingProcessingTimeWindows:會話處理時間視窗

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

三、window API

視窗分配器 —— window() 方法

  • 我們可以用 .window() 來定義一個視窗,然后基于這個 window 去做一些聚
    合或者其它處理操作,注意 window () 方法必須在 keyBy 之后才能用
  • Flink 提供了更加簡單的三種型別時間視窗用于定義時
    間視窗,也提供了countWindowAll來定義計數視窗

TumblingEventTimeWindows:滾動事件時間視窗
TumblingProcessingTimeWindows:滾動處理時間視窗
SlidingEventTimeWindows:滑動事件時間視窗
SlidingProcessingTimeWindows:滑動處理時間視窗
EventTimeSessionWindows:會話事件時間視窗
SlidingProcessingTimeWindows:會話處理時間視窗

四、視窗分配器(window assigner)

window function 定義了要對視窗中收集的資料做的計算操作,可以分為兩類,

1)增量聚合函式(incremental aggregation functions)

  • 每條資料到來就進行計算,保持一個簡單的狀態
  • ReduceFunction
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
  • AggregateFunction
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

2)全視窗函式(full window functions)

  • 先把視窗所有資料收集起來,等到計算的時候會遍歷所有資料
  • ProcessWindowFunction

一個ProcessWindowFunction可以這樣定義和使用:

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

3)其它可選window API

  • .trigger() —— 觸發器,定義 window 什么時候關閉,觸發計算并輸出結果
  • .evictor() —— 移除器,定義移除某些資料的邏輯
  • .allowedLateness() —— 允許處理遲到的資料
  • .sideOutputLateData() —— 將遲到的資料放入側輸出流
  • .getSideOutput() —— 獲取側輸出流

官方檔案
Flink 明確支持以下三種時間語意:

  • 事件時間(event time): 事件產生的時間,記錄的是設備生產(或者存盤)事件的時間

  • 攝取時間(ingestion time): 資料進入Flink的時間,Flink 讀取事件時記錄的時間

  • 處理時間(processing time):執行操作算子的本地系統時間,與機器相關

上面圖片來源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/

六、設定 Event Time

我們可以直接在代碼中,對執行環境呼叫 setStreamTimeCharacteristic
方法,設定流的時間特性,具體的時間,還需要從資料中提取時間戳(timestamp)

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

七、水位線(Watermark)

官方檔案

1)為什么需要水位線(Watermark)

當 Flink 以 Event Time 模式處理資料流時,它會根據資料里的時間戳來
處理基于時間的算子,由于網路、分布式等原因,會導致亂序資料的產生,亂序資料會讓視窗計算不準確,Watermark正是處理亂序資料而來的,

2)如何利用Watermark處理亂序資料問題?

遇到一個時間戳達到了視窗關閉時間,不應該立刻觸發視窗計算,而是等
待一段時間,等遲到的資料來了再關閉視窗,

  • Watermark 是一種衡量 Event Time 進展的機制,可以設定延遲觸發
  • Watermark 是用于處理亂序事件的,而正確的處理亂序事件,通常
    Watermark 機制結合 window 來實作
  • 資料流中的 Watermark 用于表示 timestamp 小于 Watermark 的資料,
    都已經到達了,因此,window 的執行也是由 Watermark 觸發的;
  • watermark 用來讓程式自己平衡延遲和結果正確性,

3)watermark 的特點

  • watermark 是一條特殊的資料記錄
  • watermark 必須單調遞增,以確保任務的事件時間時鐘在向前推進,而不
    是在后退
  • watermark 與資料的時間戳相關

4)watermark 的傳遞

5)watermark 策略與應用

1)Watermark 策略簡介

時間戳的分配與 watermark 的生成是齊頭并進的,其可以告訴 Flink 應用程式事件時間的進度,其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式,

使用 Flink API 時需要設定一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy,WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構建自己的 watermark 策略,WatermarkStrategy 介面如下:

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根據策略實體化一個可分配時間戳的 {@link TimestampAssigner},
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根據策略實體化一個 watermark 生成器,
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常情況下,你不用實作此介面,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssignerWatermarkGenerator 進行系結,

【例如】你想要要使用有界無序(bounded-out-of-orderness)watermark 生成器和一個 lambda 運算式作為時間戳分配器,那么可以按照如下方式實作:

WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
  })

【溫馨提示】其中 TimestampAssigner 的設定與否是可選的,大多數情況下,可以不用去特別指定,

2)使用 Watermark 策略應用

WatermarkStrategy 可以在 Flink 應用程式中的兩處使用:

  • 第一種是直接在資料源上使用
  • 第二種是直接在非資料源的操作之后使用,

【溫馨提示】第一種方式相比會更好,因為資料源可以利用 watermark 生成邏輯中有關分片/磁區(shards/partitions/splits)的資訊,使用這種方式,資料源通常可以更精準地跟蹤 watermark,整體 watermark 生成將更精確,

【示例】僅當無法直接在資料源上設定策略時,才應該使用第二種方式(在任意轉換操作之后設定 WatermarkStrategy):

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>)

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

【示例】處理空閑資料源

如果資料源中的某一個磁區/分片在一段時間內未發送事件資料,則意味著 WatermarkGenerator 也不會獲得任何新資料去生成 watermark,我們稱這類資料源為空閑輸入或空閑源,在這種情況下,當某些其他磁區仍然發送事件資料的時候就會出現問題,由于下游算子 watermark 的計算方式是取所有不同的上游并行資料源 watermark 的最小值,則其 watermark 將不會發生變化,

WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withIdleness(Duration.ofMinutes(1))

3)使用場景

  • 對于排好序的資料,不需要延遲觸發,可以只指定時間戳就行了,
// 注意時間是毫秒,所以根據時間戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
  • Flink 暴露了 TimestampAssigner 介面供我們實作,使我們可以自定義如
    何從事件資料中抽取時間戳和生成watermark,
// MyAssigner 可以有兩種型別,都繼承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())

4)TimestampAssigner

定義了抽取時間戳,以及生成 watermark 的方法,有兩種型別

1、AssignerWithPeriodicWatermarks

  • 周期性的生成 watermark:系統會周期性的將 watermark 插入到流中
  • 默認周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval()
    方法進行設定
  • 升序和前面亂序的處理 BoundedOutOfOrderness ,都是基于周期性
    watermark 的,

2、AssignerWithPunctuatedWatermarks

  • 沒有時間周期規律,可打斷的生成 watermark

可以棄用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了

在 Flink 新的 WatermarkStrategyTimestampAssignerWatermarkGenerator 的抽象介面之前,Flink 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks,你仍可以在 API 中看到它們,但建議使用新介面,因為其對時間戳和 watermark 等重點的抽象和分離很清晰,并且還統一了周期性和標記形式的 watermark 生成方式,

5)WatermarkStrategy(重點)

flink1.11版本后 建議用WatermarkStrategy(Watermark生成策略)生成Watermark,當創建DataStream物件后,使用如下方法指定策略:assignTimestampsAndWatermarks(WatermarkStrategy<T>)

通常情況下,你不用實作此介面,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進行系結,

1、固定亂序長度策略(forBoundedOutOfOrderness)

通過呼叫WatermarkStrategy物件上的forBoundedOutOfOrderness方法來實作,接收一個Duration型別的引數作為最大亂序(out of order)長度,WatermarkStrategy物件上的withTimestampAssigner方法為從事件資料中提取時間戳提供了介面,

【示例】

  • ForBoundedOutOfOrderness.java
package com.com.streaming.watermarkstrategy;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.time.LocalDateTime;

//在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位線示例
public class ForBoundedOutOfOrderness {

    public static void main(String[] args) throws  Exception{
        //創建流處理環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設定EventTime語意
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設定周期生成Watermark間隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示資料
        DataStreamSource<ClickEvent> mySource = env.fromElements(
                new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
                new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
                new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
                new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
                new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
                new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
                new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
                new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
        );
        //WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位線
        //可更好處理延遲資料
        //BoundedOutOfOrdernessWatermarks<T>實作WatermarkGenerator<T>
        SingleOutputStreamOperator<ClickEvent> streamTS = mySource.assignTimestampsAndWatermarks(
                //指定Watermark生成策略,最大延遲長度5毫秒
                WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(5))
                        .withTimestampAssigner(
                                //SerializableTimestampAssigner介面中實作了extractTimestamp方法來指定如何從事件資料中抽取時間戳
                                new SerializableTimestampAssigner<ClickEvent>() {
                                    @Override
                                    public long extractTimestamp(ClickEvent event, long recordTimestamp) {
                                        return event.getDateTime(event.getEventTime());
                                    }
                                })
        );
        //結果列印
        streamTS.print();
        env.execute();
    }
}

  • ClickEvent.java
package com.com.streaming.watermarkstrategy;

import java.time.LocalDateTime;
import java.time.ZoneOffset;

public class ClickEvent {
    private String user;
    private long l;
    private int i;
    private LocalDateTime eventTime;

    public ClickEvent(LocalDateTime eventTime, String user, long l, int i) {
        this.eventTime = eventTime;
        this.user = user;
        this.l = l;
        this.i = i;
    }

    public LocalDateTime getEventTime() {
        return eventTime;
    }

    public void setEventTime(LocalDateTime eventTime) {
        this.eventTime = eventTime;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public long getL() {
        return l;
    }

    public void setL(long l) {
        this.l = l;
    }

    public int getI() {
        return i;
    }

    public void setI(int i) {
        this.i = i;
    }

    public long getDateTime(LocalDateTime dt) {
        ZoneOffset zoneOffset8 = ZoneOffset.of("+8");
        return dt.toInstant(zoneOffset8).toEpochMilli();
    }
}

2、單調遞增策略(forMonotonousTimestamps)

通過呼叫WatermarkStrategy物件上的forMonotonousTimestamps方法來實作,無需任何引數,相當于將forBoundedOutOfOrderness策略的最大亂序長度outOfOrdernessMillis設定為0

  • ForMonotonousTimestamps.java
package com.com.streaming.watermarkstrategy;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.time.LocalDateTime;

public class ForMonotonousTimestamps {
    public static void main(String[] args) throws  Exception{
        //創建流處理環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設定EventTime語意
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設定周期生成Watermark間隔(10毫秒)
        env.getConfig().setAutoWatermarkInterval(10L);
        //并行度1
        env.setParallelism(1);
        //演示資料
        DataStreamSource<ClickEvent> mySource = env.fromElements(
                new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
                new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
                new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
                new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
                new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
                new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
                new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
                new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
        );
        //WatermarkStrategy.forMonotonousTimestamps周期性生成水位線
        //相當于延遲outOfOrdernessMillis=0
        //繼承自BoundedOutOfOrdernessWatermarks<T>

        SingleOutputStreamOperator<ClickEvent> streamTS = mySource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
                        .withTimestampAssigner((event, recordTimestamp) -> event.getDateTime(event.getEventTime()))
        );
        //結果列印
        streamTS.print();
        env.execute();
    }
}

3、不生成策略(noWatermarks)

WatermarkStrategy.noWatermarks()

  • 當一個算子從多個上游算子中獲取資料時,會取上游最小的Watermark作為自身的Watermark,并檢測是否滿足視窗觸發條件,當達不到觸發條件,視窗會在記憶體中快取大量視窗資料,導致記憶體不足等問題
  • flink提供了設定流狀態為空閑的withIdleness方法,在設定的超時時間內,當某個資料流一直沒有事件資料到達,就標記這個流為空閑,下游算子不需要等待這條資料流產生的Watermark,而取其他上游激活狀態的Watermark,來決定是否需要觸發視窗計算,

上面代碼設定超時時間5毫秒,超過這個時間,沒有生成Watermark,將流狀態設定空閑,當下次有新的Watermark生成并發送到下游時,重新設定為活躍,
WatermarkStrategy.withIdleness(Duration.ofMillis(5))

未完待續~

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/472327.html

標籤:其他

上一篇:一文讀懂資料庫發展史

下一篇:Atlas2.2.0編譯、安裝及使用(集成ElasticSearch,匯入Hive資料)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more