主頁 > 資料庫 > 云小課|MRS資料分析-通過Spark Streaming作業消費Kafka資料

云小課|MRS資料分析-通過Spark Streaming作業消費Kafka資料

2023-02-24 07:52:49 資料庫

閱識風云是華為云資訊大咖,擅長將復雜資訊多元化呈現,其出品的一張圖(云圖說)、深入淺出的博文(云小課)或短視頻(云視廳)總有一款能讓您快速上手華為云,更多精彩內容請單擊此處,

摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式資料的能力,本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka資料,

本文分享自華為云社區《【云小課】EI第48課 MRS資料分析-通過Spark Streaming作業消費Kafka資料》,作者: 閱識風云 ,

Spark是分布式批處理框架,提供分析挖掘與迭代式記憶體計算能力,支持多種語言(Scala/Java/Python)的應用開發,

Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式資料的能力,本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka資料,

在本案例中,假定某個業務Kafka每1秒就會收到1個單詞記錄,基于業務需要,開發的Spark應用程式實作實時累加計算每個單詞的記錄總數的功能,

本案例基本操作流程如下所示:

  1. 創建MRS集群,
  2. 準備應用程式,
  3. 上傳Jar包及源資料,
  4. 運行作業并查看結果,

場景描述

Spark提供分析挖掘與迭代式記憶體計算能力, 適用以下場景:

  • 資料處理(Data Processing):可以用來快速處理資料,兼具容錯性和可擴展性,
  • 迭代計算(Iterative Computation):支持迭代計算,有效應對多步的資料處理邏輯,
  • 資料挖掘(Data Mining):在海量資料基礎上進行復雜的挖掘分析,可支持各種資料挖掘和機器學習演算法,
  • 流式處理(Streaming Processing):支持秒級延遲的流式處理,可支持多種外部資料源,
  • 查詢分析(Query Analysis):支持標準SQL查詢分析,同時提供DSL(DataFrame), 并支持多種外部輸入,

當前Spark支持兩種資料處理方式:Direct Streaming和Receiver方式,

Direct Streaming方式主要通過采用Direct API對資料進行處理,以Kafka Direct介面為例,與啟動一個Receiver來連續不斷地從Kafka中接收資料并寫入到WAL中相比,Direct API簡單地給出每個batch區間需要讀取的偏移量位置,然后,每個batch的Job被運行,而對應偏移量的資料在Kafka中已準備好,這些偏移量資訊也被可靠地存盤在checkpoint檔案中,應用失敗重啟時可以直接讀取偏移量資訊,

Direct Kafka介面資料傳輸

需要注意的是,Spark Streaming可以在失敗后重新從Kafka中讀取并處理資料段,然而,由于語意僅被處理一次,重新處理的結果和沒有失敗處理的結果是一致的,

因此,Direct API消除了需要使用WAL和Receivers的情況,且確保每個Kafka記錄僅被接收一次,這種接收更加高效,使得Spark Streaming和Kafka可以很好地整合在一起,總體來說,這些特性使得流處理管道擁有高容錯性、高效性及易用性,因此推薦使用Direct Streaming方式處理資料,

在一個Spark Streaming應用開始時(也就是Driver開始時),相關的StreamingContext(所有流功能的基礎)使用SparkContext啟動Receiver成為長駐運行任務,這些Receiver接收并保存流資料到Spark記憶體中以供處理,用戶傳送資料的生命周期如圖1-2所示:

資料傳輸生命周期

  1. 接收資料(藍色箭頭)
    Receiver將資料流分成一系列小塊,存盤到Executor記憶體中,另外,在啟用預寫日志(Write-ahead Log,簡稱WAL)以后,資料同時還寫入到容錯檔案系統的預寫日志中,
  2. 通知Driver(綠色箭頭)
    接收塊中的元資料(Metadata)被發送到Driver的StreamingContext,這個元資料包括:
    定位其在Executor記憶體中資料位置的塊Reference ID,
    若啟用了WAL,還包括塊資料在日志中的偏移資訊,
  3. 處理資料(紅色箭頭)
    對每個批次的資料,StreamingContext使用Block資訊產生RDD及其Job,StreamingContext通過運行任務處理Executor記憶體中的Block來執行Job,
  4. 周期性地設定檢查點(橙色箭頭)
  5. 為了容錯的需要,StreamingContext會周期性地設定檢查點,并保存到外部檔案系統中,

華為云MapReduce服務提供了Spark服務多種場景下的樣例工程,本案例對應示例場景的開發思路:

  1. 接收Kafka中資料,生成相應DStream,
  2. 對單詞記錄進行分類統計,
  3. 計算結果,并進行列印,

步驟1:創建MRS集群

1、創建并購買一個包含有Spark2x、Kafka組件的MRS集群,詳情請參見MRS用戶指南的“購買自定義集群”,

說明:本文以購買的MRS 3.1.0版本的集群為例,集群未開啟Kerberos認證,

2、集群購買成功后,在MRS集群的任一節點內,安裝集群客戶端,具體操作可參考MRS快速入門的“安裝并使用集群客戶端”,

例如客戶端安裝目錄為“/opt/client”,

步驟2:準備應用程式

1、通過開源鏡像站獲取樣例工程,

下載樣例工程的Maven工程原始碼和組態檔,并在本地配置好相關開發工具,可參考MRS開發指南(普通版_3.x)的“通過開源鏡像站獲取樣例工程”,

根據集群版本選擇對應的分支,下載并獲取MRS相關樣例工程,

例如本章節場景對應示例為“SparkStreamingKafka010JavaExample”樣例,

2、本地使用IDEA工具匯入樣例工程,等待Maven工程下載相關依賴包,具體操作可參考考MRS開發指南(普通版_3.x)的Spark開發指南(普通模式)的“配置并匯入樣例工程”,

在本示例工程中,通過使用Streaming呼叫Kafka介面來獲取單詞記錄,然后把單詞記錄分類統計,得到每個單詞記錄數,關鍵代碼片段如下:

public class StreamingExampleProducer { 
    public static void main(String[] args) throws IOException { 
        if (args.length < 2) { 
            printUsage(); 
        } 
        String brokerList = args[0]; 
        String topic = args[1]; 
        String filePath = "/home/data/";    //源資料獲取路徑 
        Properties props = new Properties(); 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); 
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); 
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
        Producer<String, String> producer = new KafkaProducer<String, String>(props); 
 
        for (int m = 0; m < Integer.MAX_VALUE / 2; m++) { 
            File dir = new File(filePath); 
            File[] files = dir.listFiles(); 
            if (files != null) { 
                for (File file : files) { 
                    if (file.isDirectory()) { 
                        System.out.println(file.getName() + "This is a directory!"); 
                    } else { 
                        BufferedReader reader = null; 
                        reader = new BufferedReader(new FileReader(filePath + file.getName())); 
                        String tempString = null; 
                        while ((tempString = reader.readLine()) != null) { 
                            // Blank line judgment 
                            if (!tempString.isEmpty()) { 
                                producer.send(new ProducerRecord<String, String>(topic, tempString)); 
                            } 
                        } 
                        // make sure the streams are closed finally. 
                        reader.close(); 
                    } 
                } 
            } 
            try { 
                Thread.sleep(3); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    private static void printUsage() { 
        System.out.println("Usage: {brokerList} {topic}"); 
    } 
}

3、本地配置好Maven及SDK相關引數后,樣例工程會自動加載相關依賴包,加載完畢后,執行package打包,獲取打包后的Jar檔案,

例如打包后的Jar檔案為“SparkStreamingKafka010JavaExample-1.0.jar”,

步驟3:上傳Jar包及源資料

1、準備向Kafka發送的源資料,例如如下的“input_data.txt”檔案,將該檔案上傳到客戶端節點的“/home/data”目錄下,

ZhangSan 
LiSi 
WangwWU 
Tom 
Jemmmy 
LinDa

2、將編譯后的Jar包上傳到客戶端節點,例如上傳到“/opt”目錄,

說明:如果本地網路無法直接連接客戶端節點上傳檔案,可先將jar檔案或者源資料上傳至OBS檔案系統中,然后通過MRS管理控制臺集群內的“檔案管理”頁面匯入HDFS中,再通過HDFS客戶端使用hdfs dfs -get命令下載到客戶端節點本地,

步驟4:運行作業并查看結果

1、使用root用戶登錄安裝了集群客戶端的節點,

cd /opt/client
source bigdata_env

2、創建用于接收資料的Kafka Topic,

kafka-topics.sh --create --zookeeper quorumpeer實體IP地址:ZooKeeper客戶端連接埠/kafka --replication-factor 2 --partitions 3 --topic topic名稱

quorumpeer實體IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > ZooKeeper > 實體”界面中查詢,多個地址可用“,”分隔,ZooKeeper客戶端連接埠可通過ZooKeeper服務配置引數“clientPort”查詢,默認為2181,

例如執行以下命令:

kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka

回傳結果如下:

Created topic sparkkafka.

3、Topic創建成功后,運行程式向Kafka發送資料,

java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer Broker實體IP地址:Kafka連接埠 topic名稱

Kafka Broker實體IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > Kafka > 實體”界面中查詢,多個地址可用“,”分隔,Broker埠號可通過Kafka服務配置引數“port”查詢,默認為9092,

例如執行以下命令:

java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka
回傳結果如下:
... 
transactional.id = null 
value.serializer = class org.apache.kafka.common.serialization.StringSerializer 
 
2022-06-08 15:43:42 INFO  AppInfoParser:117 - Kafka version: xxx 
2022-06-08 15:43:42 INFO  AppInfoParser:118 - Kafka commitId: xxx 
2022-06-08 15:43:42 INFO  AppInfoParser:119 - Kafka startTimeMs: xxx 
2022-06-08 15:43:42 INFO  Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A

4、重新打開一個客戶端連接視窗,執行以下命令,讀取Kafka Topic中的資料,

cd /opt/client/Spark2x/spark
source bigdata_env
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

<checkPointDir>指應用程式結果備份到HDFS的路徑,自行指定即可,例如“/tmp”,
<brokers>指獲取元資料的Kafka地址,格式為“Broker實體IP地址:Kafka連接埠”,
<topic>指讀取Kafka上的topic名稱,
<batchTime>指Streaming分批的處理間隔,例如設定為“5”,

例如執行以下命令:

cd /opt/client/Spark2x/spark
source bigdata_env
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5

程式運行后,可查看到Kafka中資料的統計結果:

.... 
-------------------------------------------                                      
Time: 1654674380000 ms 
------------------------------------------- 
(ZhangSan,6) 
(Tom,6) 
(LinDa,6) 
(WangwWU,6) 
(LiSi,6) 
(Jemmmy,6) 
 
-------------------------------------------                                      
Time: 1654674385000 ms 
------------------------------------------- 
(ZhangSan,717) 
(Tom,717) 
(LinDa,717) 
(WangwWU,717) 
(LiSi,717) 
(Jemmmy,717) 
 
------------------------------------------- 
Time: 1654674390000 ms 
------------------------------------------- 
(ZhangSan,2326) 
(Tom,2326) 
(LinDa,2326) 
(WangwWU,2326) 
(LiSi,2326) 
(Jemmmy,2326) 
 ...

5、登錄FusionInsight Manager界面,單擊“集群 > 服務 > Spark2x”,

6、在服務概覽頁面點擊Spark WebUI后的鏈接地址,可進入History Server頁面,

單擊待查看的App ID,您可以查看Spark Streaming作業的狀態,

----結束

好了,本期云小課就介紹到這里,快去體驗MapReduce(MRS)更多功能吧!猛戳這里

 

點擊關注,第一時間了解華為云新鮮技術~

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/544908.html

標籤:其他

上一篇:SQL中的排序order by 、SQL中的分頁limit、SQL的多表查詢、

下一篇:MySQL調優

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more