主頁 > 資料庫 > Apache Flink 如何正確處理實時計算場景中的亂序資料

Apache Flink 如何正確處理實時計算場景中的亂序資料

2020-12-07 07:22:05 資料庫

一、流式計算的未來

在谷歌發表了 GFS、BigTable、Google MapReduce 三篇論文后,大資料技術真正有了第一次飛躍,Hadoop 生態系統逐漸發展起來,

Hadoop 在處理大批量資料時表現非常好,主要有以下特點:

1、計算開始之前,資料必須提前準備好,然后才可以開始計算;

2、當大量資料計算完成之后,會輸出最后計算結果,完成計算;

3、時效性比較低,不適用于實時計算;

而隨著實時推薦、風控等業務的發展,資料處理時延要求越來越高,實時性要求也越來越高,Flink 開始在社區嶄露頭角,

Apache Flink 作為一款真正的流處理框架,具有較低的延遲性,能夠保證訊息傳輸不丟失不重復,具有非常高的吞吐,支持原生的流處理,

本文主要介紹 Flink 的時間概念、視窗計算以及 Flink 是如何處理視窗中的亂序資料,

在 Flink 中主要有三種時間概念:

(1)事件產生的時間,叫做 Event Time;

(2)資料接入到 Flink 的時間,叫做 Ingestion Time;

(3)資料在 Flink 系統里被操作時機器的系統時間,叫做 Processing Time

處理時間是一種比較簡單的時間概念,不需要流和系統之間進行協調,可以提供最佳的性能和最低的延遲,但是在分布式環境中,多臺機器的處理時間無法做到嚴格一致,無法提供確定性的保障,

而事件時間是事件產生的時間,在進入到 Flink 系統的時候,已經在 record 中進行記錄,可以通過用提取事件時間戳的方式,保證在處理程序中,反映事件發生的先后關系,

file

file

我們知道流式資料集是沒有邊界的,資料會源源不斷的發送到我們的系統中,

流式計算最終的目的是去統計資料產生匯總結果的,而在無界資料集上,如果做一個全域的視窗統計,是不現實的,

只有去劃定一定大小的視窗范圍去做計算,才能最侄訓總到下游的系統中,用來分析和展示,
file

在 Flink 進行視窗計算的時候,需要去知道兩個核心的資訊:

  • 每個 Element 的 EventTime 時間戳?(在資料記錄中指定即可)
  • 接入的資料,何時可以觸發統計計算 ? (視窗 11:00 ~ 11:10 的資料全部被接收完)

有序事件

假設在完美的條件下,資料都是嚴格有序,那么此時,流式計算引擎是可以正確計算出每個視窗的資料的

file

無序事件

但是現實中,資料可能會因為各種各樣的原因(系統延遲,網路延遲等)不是嚴格有序到達系統,甚至有的資料還會遲到很久,此時 Flink 需要有一種機制,允許資料可以在一定范圍內亂序,

這種機制就是水印,

file

如上面,有一個引數: MaxOutOfOrderness = 4,為最大亂序時間,意思是可以允許資料在多少范圍內亂序,可以是 4 分鐘,4 個小時 等,

水印的生成策略是,當前視窗最大事件時間戳減去 MaxOutOfOrderness 的值,

如上圖,事件 7 會產生一個 w(3) 的水印,事件 11 會產生要給 w(7) 的水印,但是事件 9 ,是小于事件 11 的,此時不會觸發水印的更新,事件 15 會產生一個 w(11) 的水印, 也就是說,水印反映了事件的整體流轉的趨勢,只會上升,不會下降,

水印表示了所有小于水印值的事件都已經到達了視窗,

每當有新的最大時間戳出現時,就會產生新的 watermark

遲到事件

對于事件時間小于水印時間的事件,稱為遲到事件,遲到事件是不會被納入視窗統計的,

如下圖,21 的事件進入系統之后,會產生 w(17) 的水印,而后來的 16 事件,由于小于當前水印時間 w(17),是不會被統計的了,

file

何時觸發計算

我們用一個圖來展示何時會觸發視窗的計算

如下圖,表示一個 11:50 到 12:00 的視窗,此時有一條資料, cat,11:55,事件時間是 11:55,在視窗中,最大延遲時間是 5 分鐘,所以當前水印時間是 11:50

file

此時又來了一條資料,dog,11:59,事件時間是 11:59,進入到了視窗中,

由于這個事件時間比上次的事件時間大,所以水印被更新成 11:54,此時由于水印時間仍然小于視窗結束時間,所以仍然沒有觸發計算,

file

又來了一條資料, cow,12:06,此時水印時間被更新到了 12:01 ,已經大于了視窗結束時間,此時觸發了視窗計算(假設計算邏輯就是統計視窗內不同元素的個數),

file

假設又來了一條事件,是 dog,11:58,由于它已經小于了水印時間,并且在上次觸發視窗計算之后,視窗已經被銷毀,所以,這條事件是不會被觸發計算的了,

此時,可以這個事件放到 sideoutput 佇列中,額外邏輯處理,

file

所以在 1.11 版本中,重構了水印生成介面,新版本中,主要通過 WatermarkStrategy 類,來使用不同的策略生成水印,

新的介面提供了很多靜態的方法和帶有預設實作的方法,如果想自己定義生成策略,可以實作這個方法:

file

生成一個 WatermarkGenerator

file

這個類也很簡單明了

  • onEvent:如果我們想依賴每個元素生成一個水印發射到下游,可以實作這個方法;
  • OnPeriodicEmit:如果資料量比較大的時候,我們每條資料都生成一個水印的話,會影響性能,所以這里還有一個周期性生成水印的方法,

為了方便開發,Flink 還提供了一些內置的水印生成方法供我們使用

  • 固定延遲生成水印
    我們想生成一個延遲 3 s 的固定水印,可以這樣做
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
  • 單調遞增生成水印
    相當于上述的延遲策略去掉了延遲時間,以 event 中的時間戳充當了水印,可以這樣使用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

五、一個簡單的小例子,來統計視窗中字母出現的次數

public class StreamTest1 {


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public static class MyLog {
        private String msg;
        private Integer cnt;
        private long timestamp;
    }

    public static class MySourceFunction implements SourceFunction<MyLog> {

        private boolean running = true;

        @Override
        public void run(SourceContext<MyLog> ctx) throws Exception {
            while (true) {
                Thread.sleep(1000);
                ctx.collect(new MyLog(RandomUtil.randomString(1),1,System.currentTimeMillis()));
            }
        }
        @Override
        public void cancel() {
            this.running = false;
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 資料源使用自定義資料源,每1s發送一條隨機訊息
        env.addSource(new MySourceFunction())
                // 指定水印生成策略是,最大事件時間減去 5s,指定事件時間欄位為 timestamp
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.
                                <MyLog>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event,timestamp)->event.timestamp))
                // 按 訊息分組
                .keyBy((event)->event.msg)
                // 定義一個10s的時間視窗
                .timeWindow(Time.seconds(10))
                // 統計訊息出現的次數
                .sum("cnt")
                // 列印輸出
                .print();

        env.execute("log_window_cnt");
    }
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/230966.html

標籤:大數據

上一篇:請教一個行轉列的SQL

下一篇:Redis資料結構

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more