主頁 > 資料庫 > 「Flink」使用Java lambda運算式實作Flink WordCount

「Flink」使用Java lambda運算式實作Flink WordCount

2020-09-14 07:53:25 資料庫

本篇我們將使用Java語言來實作Flink的單詞統計,

代碼開發

環境準備

匯入Flink 1.9 pom依賴

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>

構建Flink流處理環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定義source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒發送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

單詞計算

// 3. 單詞統計
        // 3.1 將文本行切分成一個個的單詞
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 將單詞轉換為一個個的元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 對每組單詞數量進行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

參考代碼

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 構建Flink流式初始化環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 自定義source - 每秒發送一行文本
        DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒發送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

        // 3. 單詞統計
        // 3.1 將文本行切分成一個個的單詞
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 將單詞轉換為一個個的元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 對每組單詞數量進行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

        env.execute("app");
    }
}

Flink對Java Lambda運算式支持情況

Flink支持Java API所有運算子使用Lambda運算式,但是,但Lambda運算式使用Java泛型時,就需要宣告型別資訊,

我們來看下上述的這段代碼:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

之所以這里將所有的型別資訊,因為Flink無法正確自動推斷出來Collector中帶的泛型,我們來看一下FlatMapFuntion的源代碼

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}

我們發現 flatMap的第二個引數是Collector<O>,是一個帶引數的泛型,Java編譯器編譯該代碼時會進行引數型別擦除,所以Java編譯器會變成成:

void flatMap(T value, Collector out)

這種情況,Flink將無法自動推斷型別資訊,如果我們沒有顯示地提供型別資訊,將會出現以下錯誤:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information.

這種情況下,必須要顯示指定型別資訊,否則輸出將回傳值視為Object型別,這將導致Flink無法正確序列化,

所以,我們需要顯示地指定Lambda運算式的引數型別資訊,并通過returns方法顯示指定輸出的型別資訊

我們再看一段代碼:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

為什么map后面也需要指定型別呢?

因為此處map回傳的是Tuple2型別,Tuple2是帶有泛型引數,在編譯的時候同樣會被查出泛型引數資訊,導致Flink無法正確推斷,

更多關于對Java Lambda運算式的支持請參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/32512.html

標籤:大數據

上一篇:「資料挖掘入門系列」Python快速入門

下一篇:「Flink」Flink的狀態管理與容錯

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more