閱識風云是華為云資訊大咖,擅長將復雜資訊多元化呈現,其出品的一張圖(云圖說)、深入淺出的博文(云小課)或短視頻(云視廳)總有一款能讓您快速上手華為云,更多精彩內容請單擊此處,
摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式資料的能力,本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka資料,
本文分享自華為云社區《【云小課】EI第48課 MRS資料分析-通過Spark Streaming作業消費Kafka資料》,作者: 閱識風云 ,
Spark是分布式批處理框架,提供分析挖掘與迭代式記憶體計算能力,支持多種語言(Scala/Java/Python)的應用開發,
Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式資料的能力,本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka資料,
在本案例中,假定某個業務Kafka每1秒就會收到1個單詞記錄,基于業務需要,開發的Spark應用程式實作實時累加計算每個單詞的記錄總數的功能,
本案例基本操作流程如下所示:
- 創建MRS集群,
- 準備應用程式,
- 上傳Jar包及源資料,
- 運行作業并查看結果,
場景描述
Spark提供分析挖掘與迭代式記憶體計算能力, 適用以下場景:
- 資料處理(Data Processing):可以用來快速處理資料,兼具容錯性和可擴展性,
- 迭代計算(Iterative Computation):支持迭代計算,有效應對多步的資料處理邏輯,
- 資料挖掘(Data Mining):在海量資料基礎上進行復雜的挖掘分析,可支持各種資料挖掘和機器學習演算法,
- 流式處理(Streaming Processing):支持秒級延遲的流式處理,可支持多種外部資料源,
- 查詢分析(Query Analysis):支持標準SQL查詢分析,同時提供DSL(DataFrame), 并支持多種外部輸入,
當前Spark支持兩種資料處理方式:Direct Streaming和Receiver方式,
Direct Streaming方式主要通過采用Direct API對資料進行處理,以Kafka Direct介面為例,與啟動一個Receiver來連續不斷地從Kafka中接收資料并寫入到WAL中相比,Direct API簡單地給出每個batch區間需要讀取的偏移量位置,然后,每個batch的Job被運行,而對應偏移量的資料在Kafka中已準備好,這些偏移量資訊也被可靠地存盤在checkpoint檔案中,應用失敗重啟時可以直接讀取偏移量資訊,
Direct Kafka介面資料傳輸
需要注意的是,Spark Streaming可以在失敗后重新從Kafka中讀取并處理資料段,然而,由于語意僅被處理一次,重新處理的結果和沒有失敗處理的結果是一致的,
因此,Direct API消除了需要使用WAL和Receivers的情況,且確保每個Kafka記錄僅被接收一次,這種接收更加高效,使得Spark Streaming和Kafka可以很好地整合在一起,總體來說,這些特性使得流處理管道擁有高容錯性、高效性及易用性,因此推薦使用Direct Streaming方式處理資料,
在一個Spark Streaming應用開始時(也就是Driver開始時),相關的StreamingContext(所有流功能的基礎)使用SparkContext啟動Receiver成為長駐運行任務,這些Receiver接收并保存流資料到Spark記憶體中以供處理,用戶傳送資料的生命周期如圖1-2所示:
資料傳輸生命周期

- 接收資料(藍色箭頭)
Receiver將資料流分成一系列小塊,存盤到Executor記憶體中,另外,在啟用預寫日志(Write-ahead Log,簡稱WAL)以后,資料同時還寫入到容錯檔案系統的預寫日志中, - 通知Driver(綠色箭頭)
接收塊中的元資料(Metadata)被發送到Driver的StreamingContext,這個元資料包括:
定位其在Executor記憶體中資料位置的塊Reference ID,
若啟用了WAL,還包括塊資料在日志中的偏移資訊, - 處理資料(紅色箭頭)
對每個批次的資料,StreamingContext使用Block資訊產生RDD及其Job,StreamingContext通過運行任務處理Executor記憶體中的Block來執行Job, - 周期性地設定檢查點(橙色箭頭)
- 為了容錯的需要,StreamingContext會周期性地設定檢查點,并保存到外部檔案系統中,
華為云MapReduce服務提供了Spark服務多種場景下的樣例工程,本案例對應示例場景的開發思路:
- 接收Kafka中資料,生成相應DStream,
- 對單詞記錄進行分類統計,
- 計算結果,并進行列印,
步驟1:創建MRS集群
1、創建并購買一個包含有Spark2x、Kafka組件的MRS集群,詳情請參見MRS用戶指南的“購買自定義集群”,
說明:本文以購買的MRS 3.1.0版本的集群為例,集群未開啟Kerberos認證,
2、集群購買成功后,在MRS集群的任一節點內,安裝集群客戶端,具體操作可參考MRS快速入門的“安裝并使用集群客戶端”,
例如客戶端安裝目錄為“/opt/client”,
步驟2:準備應用程式
1、通過開源鏡像站獲取樣例工程,
下載樣例工程的Maven工程原始碼和組態檔,并在本地配置好相關開發工具,可參考MRS開發指南(普通版_3.x)的“通過開源鏡像站獲取樣例工程”,
根據集群版本選擇對應的分支,下載并獲取MRS相關樣例工程,
例如本章節場景對應示例為“SparkStreamingKafka010JavaExample”樣例,
2、本地使用IDEA工具匯入樣例工程,等待Maven工程下載相關依賴包,具體操作可參考考MRS開發指南(普通版_3.x)的Spark開發指南(普通模式)的“配置并匯入樣例工程”,
在本示例工程中,通過使用Streaming呼叫Kafka介面來獲取單詞記錄,然后把單詞記錄分類統計,得到每個單詞記錄數,關鍵代碼片段如下:
public class StreamingExampleProducer { public static void main(String[] args) throws IOException { if (args.length < 2) { printUsage(); } String brokerList = args[0]; String topic = args[1]; String filePath = "/home/data/"; //源資料獲取路徑 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int m = 0; m < Integer.MAX_VALUE / 2; m++) { File dir = new File(filePath); File[] files = dir.listFiles(); if (files != null) { for (File file : files) { if (file.isDirectory()) { System.out.println(file.getName() + "This is a directory!"); } else { BufferedReader reader = null; reader = new BufferedReader(new FileReader(filePath + file.getName())); String tempString = null; while ((tempString = reader.readLine()) != null) { // Blank line judgment if (!tempString.isEmpty()) { producer.send(new ProducerRecord<String, String>(topic, tempString)); } } // make sure the streams are closed finally. reader.close(); } } } try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void printUsage() { System.out.println("Usage: {brokerList} {topic}"); } }
3、本地配置好Maven及SDK相關引數后,樣例工程會自動加載相關依賴包,加載完畢后,執行package打包,獲取打包后的Jar檔案,
例如打包后的Jar檔案為“SparkStreamingKafka010JavaExample-1.0.jar”,
步驟3:上傳Jar包及源資料
1、準備向Kafka發送的源資料,例如如下的“input_data.txt”檔案,將該檔案上傳到客戶端節點的“/home/data”目錄下,
ZhangSan
LiSi
WangwWU
Tom
Jemmmy
LinDa
2、將編譯后的Jar包上傳到客戶端節點,例如上傳到“/opt”目錄,
說明:如果本地網路無法直接連接客戶端節點上傳檔案,可先將jar檔案或者源資料上傳至OBS檔案系統中,然后通過MRS管理控制臺集群內的“檔案管理”頁面匯入HDFS中,再通過HDFS客戶端使用hdfs dfs -get命令下載到客戶端節點本地,
步驟4:運行作業并查看結果
1、使用root用戶登錄安裝了集群客戶端的節點,
cd /opt/client
source bigdata_env
2、創建用于接收資料的Kafka Topic,
kafka-topics.sh --create --zookeeper quorumpeer實體IP地址:ZooKeeper客戶端連接埠/kafka --replication-factor 2 --partitions 3 --topic topic名稱
quorumpeer實體IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > ZooKeeper > 實體”界面中查詢,多個地址可用“,”分隔,ZooKeeper客戶端連接埠可通過ZooKeeper服務配置引數“clientPort”查詢,默認為2181,
例如執行以下命令:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka
回傳結果如下:
Created topic sparkkafka.
3、Topic創建成功后,運行程式向Kafka發送資料,
java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer Broker實體IP地址:Kafka連接埠 topic名稱
Kafka Broker實體IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > Kafka > 實體”界面中查詢,多個地址可用“,”分隔,Broker埠號可通過Kafka服務配置引數“port”查詢,默認為9092,
例如執行以下命令:
java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka
回傳結果如下:
... transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2022-06-08 15:43:42 INFO AppInfoParser:117 - Kafka version: xxx 2022-06-08 15:43:42 INFO AppInfoParser:118 - Kafka commitId: xxx 2022-06-08 15:43:42 INFO AppInfoParser:119 - Kafka startTimeMs: xxx 2022-06-08 15:43:42 INFO Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A
4、重新打開一個客戶端連接視窗,執行以下命令,讀取Kafka Topic中的資料,
cd /opt/client/Spark2x/spark source bigdata_env bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
<checkPointDir>指應用程式結果備份到HDFS的路徑,自行指定即可,例如“/tmp”,
<brokers>指獲取元資料的Kafka地址,格式為“Broker實體IP地址:Kafka連接埠”,
<topic>指讀取Kafka上的topic名稱,
<batchTime>指Streaming分批的處理間隔,例如設定為“5”,
例如執行以下命令:
cd /opt/client/Spark2x/spark source bigdata_env bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5
程式運行后,可查看到Kafka中資料的統計結果:
.... ------------------------------------------- Time: 1654674380000 ms ------------------------------------------- (ZhangSan,6) (Tom,6) (LinDa,6) (WangwWU,6) (LiSi,6) (Jemmmy,6) ------------------------------------------- Time: 1654674385000 ms ------------------------------------------- (ZhangSan,717) (Tom,717) (LinDa,717) (WangwWU,717) (LiSi,717) (Jemmmy,717) ------------------------------------------- Time: 1654674390000 ms ------------------------------------------- (ZhangSan,2326) (Tom,2326) (LinDa,2326) (WangwWU,2326) (LiSi,2326) (Jemmmy,2326) ...
5、登錄FusionInsight Manager界面,單擊“集群 > 服務 > Spark2x”,
6、在服務概覽頁面點擊Spark WebUI后的鏈接地址,可進入History Server頁面,
單擊待查看的App ID,您可以查看Spark Streaming作業的狀態,
----結束
好了,本期云小課就介紹到這里,快去體驗MapReduce(MRS)更多功能吧!猛戳這里
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/544908.html
標籤:其他
上一篇:SQL中的排序order by 、SQL中的分頁limit、SQL的多表查詢、
下一篇:MySQL調優
