主頁 >  其他 > Flink之watermark 處理延遲資料 詳解

Flink之watermark 處理延遲資料 詳解

2021-12-13 07:51:56 其他

watermark介紹

在Flink中,Watermark 是 Apache Flink 為了處理 EventTime 視窗計算提出的一種機制, 本質上是一種時間戳, 用來處理實時資料中的亂序問題的,通常是水位線和視窗結合使用來實作,
從設備生成實時流事件,到Flink的source,再到多個oparator處理資料,程序中會受到網路延遲、背壓等多種因素影響造成資料亂序,在進行視窗處理時,不可能無限期的等待延遲資料到達,當到達特定watermark時,認為在watermark之前的資料已經全部達到(即使后面還有延遲的資料), 可以觸發視窗計算,這個機制就是 Watermark(水位線),

在這里插入圖片描述
在這里插入圖片描述
如上圖:
● w(11): 表示11之前的資料到已經到達,11之前的資料可以進行計算了,
● w(20): 表示20之前的資料到已經到達,20之前的資料可以進行計算了,

watermark的使用

生成時機

watermark可以在接收到DataSource的資料后,立刻生成Watermark,也可以在DataSource后,使用map或者filter操作后再生成watermark,
水位線生產的最佳位置是在盡可能靠近資料源的地方,因為水位線生成時會做出一些有關元素順序相對時間戳的假設,由于資料源讀取程序是并行的,一切引起Flink跨行資料流磁區進行重新分發的操作(比如:改變并行度,keyby等)都會導致元素時間戳亂序,但是如果是某些初始化的filter、map等不會引起元素重新分發的操作,所以是可以考慮在生成水位線之前使用,

watermark的計算

watermark = 進入 Flink 視窗的最大的事件時間(maxEventTime) — 指定的延遲時間(t)

生成方式

第一種:With Periodic Watermarks

這個是周期性觸發Waterrmark的生成和發送,
周期性分配水位線在程式中會比較常用,是我們會指示系統以固定的時間間隔發出的水位線,
在設定時間為事件時間時,會默認設定這個時間間隔為200ms, 如果需要調整可以自行設定,

設定任務時間型別和

 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //設定時間使用事件時間
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 //設定并行度為1
 env.setParallelism(1)
 //設定自動周期性的產生watermark,默認值為200毫秒
 env.getConfig.setAutoWatermarkInterval(1000)

設定水位線watermark的值

     
    //通過本地socket埠獲取資料
    val dataStream = env.socketTextStream("127.0.0.1",10010)
     //對資料的資料進行轉換為tuple2的格式
     val tupStream = dataStream.map(line => {
        val arr = line.split(" ")
        (arr(0),arr(1).toLong)
      })
          
   //設定水位線
    val waterDataStream = tupStream.assignTimestampsAndWatermarks(
      //設定時間最低延遲
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
        //設定時間戳
        .withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String,Long]] {
          //當前最大的值
          var currentMaxNum = 0l
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          override def extractTimestamp(t: Tuple2[String,Long], recordTimesstamp: Long): Long = {
            val eTime = t._2
            currentMaxNum = Math.max(eTime,currentMaxNum)
            //當前最大的值減去 允許亂序的資料,即為現在的水位線值,
            //注意:這些代碼只是為了本地觀察方便,正常開發中是不需這樣寫的,
            val waterMark = currentMaxNum - 2000;
            println("資料:"+t.toString()+"  ,"+sdf.format(eTime)+" ,  當前watermark: "+sdf.format(waterMark))
            eTime
          }
        })
    )

    //對資料進行計算和輸出
    waterDataStream.keyBy(_._1).timeWindow(Time.seconds(3)).reduce((e1, e2)=>{
            (e1._1,e1._2+e2._2)
          }).print()

輸入和輸出:

--------------------輸入
s3 1639100010955
s2 1639100009955
s1 1639100008955
s0 1639100007955
s4 1639100011955
s5 1639100012955
s6 1639100013955
s7 1639100016955
          
          
--------------------輸出
          
資料:(s3,1639100010955)  ,2021-12-10 09:33:30 ,  當前watermark: 2021-12-10 09:33:28
資料:(s2,1639100009955)  ,2021-12-10 09:33:29 ,  當前watermark: 2021-12-10 09:33:28
資料:(s1,1639100008955)  ,2021-12-10 09:33:28 ,  當前watermark: 2021-12-10 09:33:28
資料:(s0,1639100007955)  ,2021-12-10 09:33:27 ,  當前watermark: 2021-12-10 09:33:28
資料:(s4,1639100011955)  ,2021-12-10 09:33:31 ,  當前watermark: 2021-12-10 09:33:29
資料:(s5,1639100012955)  ,2021-12-10 09:33:32 ,  當前watermark: 2021-12-10 09:33:30
(s2,1639100009955)
(s0,1639100007955)
(s1,1639100008955)
資料:(s6,1639100013955)  ,2021-12-10 09:33:33 ,  當前watermark: 2021-12-10 09:33:31
資料:(s7,1639100016955)  ,2021-12-10 09:33:36 ,  當前watermark: 2021-12-10 09:33:34
(s3,1639100010955)
(s5,1639100012955)
(s4,1639100011955)

說明:

  • 在使用timeWindow的時候,會根據設定的視窗大小 3,將一分鐘內的視窗劃分為:
    0-2,3-5,6-8,9-11,12-14,15-17,18-20,21-23,24-26,27-29,30-32,33-35…
  • watermark的值是當前輸入資料中最大時間戳-去亂序時間, 在watermark前的資料才會被認定是正常的,可供window進行計算的資料,
  • 上面程式中輸入s3-s4時,watermark為的秒數是28和29,是在 timewindow劃分的時間視窗 27-29 中,所以沒有觸發計算,直到輸入s5,此時watermark秒數是30,在另一個視窗 30-32的視窗中,才會觸發 27-29視窗的計算,所以才輸出 s2,s0,s1的值,
  • 同理到s7的時候,又是另一個視窗33-35,所以觸發上一個視窗的計算,

第二種: With Punctuated Watermarks

定點水位線(標記水位線)不是太常用,主要為輸入流中包含一些用于指示系統進度的特殊元組和標記,方便根據輸入元素生成水位線的場景使用的,
由于資料流中每一個遞增的EventTime都會產生一個Watermark,
在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成,

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // onEvent 中已經實作
    }
}

延遲資料的處理方式

針對延遲太久的資料有3中處理方案:

  1. 丟棄(默認)
  2. allowedLateness: 指定允許資料延遲的時間
  3. sideOutputLateData: 收集遲到的資料
  • 對于遲到太久的資料默認是丟棄的, 不會觸發window,因為輸入的資料所在的視窗已經執行過了,Flink對這些遲到資料執行的方案就是丟棄,

  • 如果遲到不久,輸入的資料所在的視窗還未執行,是不會丟棄的, 這個要看視窗大小最大允許的資料亂序時間

附上 Flink官方檔案地址:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379501.html

標籤:其他

上一篇:hive 表和欄位注釋中文亂碼(親測有效)

下一篇:初識kafka,環境部署與(Springboot+SpringCloud+Eurka)應用(Linux)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more