文章目錄
- 一、需求分析及實作思路
- 1.1 需求分析:當日新增付費用戶首單分析
- 1.2 整體實時計算框架流程
- 1.3 具體業務流程圖
- 二、實時采集mysql資料
- 2.1 canal實時采集mysql資料
- 2.1.1 什么是canal
- 2.1.2 canal使用場景
- ①原始場景:阿里otter中間件的一部分
- ②常見場景1:更新快取服務器
- ③常見場景2
- ④常見場景3
- 2.1.3 canal的作業原理
- mysql的主從復制原理
- canal作業原理
- 2.1.4 mysql的binlog
- ①什么是binlog
- ②開啟binlog
- ③配置說明
- ④檢測配置是否成功
- 2.1.5 在mysql準備業務資料
- 2.1.6 下載安裝canal
- 2.1.7 配置canal
- 2.1.8 canalHA配置和啟動canal
- 2.1.9 測驗kafka是否收到實時資料
- 2.1.10 接收到的資料格式分析
- 2.1.11 驗證canal高可用是否正常作業
- 2.2 maxwell實時采集mysql資料
- 2.2.1 什么是maxwell
- 2.2.2 maxwell和canal的對比
- 2.2.3 使用maxwell前的準備作業
- 2.2.4 安裝和配置maxwell
- 2.2.5 啟動maxwell
- 2.2.6 maxwell發送到kafka的資料格式
- 2.3 Canal和Maxwell發送到kafka的資料對比
- 1.插入資料
- 2.洗掉資料
- 3.更新資料
- 總結資料特點
- ①日志結構
- ②資料型別
- ③待遇按時資料欄位的定義
- 三、實時資料分層
- 四、ODS層處理 ——一個表一個topic
- 4.1 定義抽象類——B a s e A p p(一個流消費一個Topic)
- 4.2 在Mykafkautil_1中添加需要的方法
- 4.3 處理canal采集的資料
- 4.4 處理maxwell采集的資料
- 測驗
一、需求分析及實作思路
1.1 需求分析:當日新增付費用戶首單分析
? 按省份,用戶性別,用戶年齡段,統計當日新增付費用戶收單平均消費及人數總比
? 無論是省份名稱、用戶性別、用戶年齡、訂單表中都沒有這些欄位,需要訂單(事實表)和維度表(省份,用戶)進行關聯,形成寬表后將資料寫入ES,通過kibana進行分析以及圖形展示
-- 一張事實表,兩張維度表
1.2 整體實時計算框架流程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Qd3LFqHe-1605546794467)(https://i.loli.net/2020/11/14/mZcMAfad8kXuR17.png)]
1.3 具體業務流程圖
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-RydWFk6F-1605546794470)(https://i.loli.net/2020/11/14/baVLm7TtosFOR1S.png)]
二、實時采集mysql資料
2.1 canal實時采集mysql資料
2.1.1 什么是canal
阿里巴巴B2B公司,因為業務的特性,賣家主要集中在國內,買家主要集中在國外,所以衍生出了杭州和美國異地機房的需求,從2010年開始,阿里系公司開始逐步的嘗試基于資料庫的日志決議,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,
Canal是用java開發的基于資料庫增量日志決議,提供增量資料訂閱&消費的中間件,目前,canal主要支持了MySQL的binlog決議,決議完成后才利用canal client 用來處理獲得的相關資料,(資料庫同步需要阿里的otter中間件,基于canal),
2.1.2 canal使用場景
①原始場景:阿里otter中間件的一部分
otter是阿里用于進行異地資料庫之間的同步框架,canal是其中一部分,
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-yOuRrJwA-1605546794471)(https://i.loli.net/2020/11/14/I53i2UKbnaxp4WQ.png)]
②常見場景1:更新快取服務器
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-twHv75bp-1605546794473)(https://i.loli.net/2020/11/14/x9qvdB13TiXNwy5.png)]
③常見場景2
抓取業務資料新增變化表,用于制作拉鏈表
如果表中沒有更新時間, 制作拉鏈表就需要使用canal實時監控資料的變化
④常見場景3
抓取業務表的新增變化資料,用于制作實時統計
我們實時數倉就是這種應用場景!
2.1.3 canal的作業原理
mysql的主從復制原理
? ①. MySQL master 將資料變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
? ②. MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
? ③. MySQL slave 重放 relay log 中事件,將資料變更反映它自己的資料
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3uc5qupu-1605546794474)(https://i.loli.net/2020/11/15/BI8F3pRV4hTqo6r.png)]
canal作業原理
? ①. canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
? ②. MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
? ③. canal 決議 binary log 物件(原始為 byte 流)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3VVAmTtp-1605546794475)(https://i.loli.net/2020/11/15/D9jx5HdmhXAeUPt.png)]
2.1.4 mysql的binlog
①什么是binlog
MySQL的二進制日志可以說是MySQL最重要的日志了,它記錄了所有的DDL和DML(除了資料查詢陳述句)陳述句,以事件形式記錄,還包含陳述句所執行的消耗的時間,MySQL的二進制日志是事務安全型的,
--一般來說開啟二進制日志大概會有1%的性能損耗,二進制有兩個最重要的使用場景:
其一:MySQL Replication在Master端開啟binlog,Mster把它的二進制日志傳遞給slaves來達到master-slave資料一致的目的,
其二:自然就是資料恢復了,通過使用mysqlbinlog工具來使恢復資料,
--二進制日志包括/兩類檔案:
A: 二進制日志索引檔案(檔案名后綴為.index)用于記錄所有的二進制的檔案,
B:二進制日志檔案(檔案名后綴為.00000*)記錄資料庫所有的DDL和DML(除了資料查詢陳述句)陳述句事件,
②開啟binlog
默認情況下, mysql是沒有開啟binlog的, 需要手動開啟.
開啟步驟:
- 找到mysql的組態檔:my.cnf. 大部分的mysql版本默認在: /etc/my.cnf.
如果沒有找到, 則可以通過下面的命令查找:
sudo find / -name my.cnf
? [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-4y7OPcKB-1605546794477)(https://i.loli.net/2020/11/16/nsYzC8FIqmR1g2X.jpg)]
#而在環境變數的檔案加入這條命令,防止亂碼
export LANG="en_US.UTF-8"
- 修改my.cnf. 在my.cnf檔案中增加如下內容:
server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall
③配置說明
server-id:
? mysql主從復制的時候, 主從之間每個實體必須有獨一無二的id
log-bin:
? 這個表示binlog日志的前綴是mysql-bin ,以后生成的日志檔案就是 mysql-bin.123456 的檔案后面的數字按順序生成, 每次mysql重啟或者到達單個檔案大小的閾值時,新生一個檔案,按順序編號,
Binlog_format:
? mysql binlog的格式,有三種值,分別是statement, row, mixed
? 三者區別:
statement:
陳述句級,binlog會記錄每次一執行寫操作的陳述句,
相對row模式節省空間,但是可能產生不一致性,比如
update tt set create_date=now()
如果用binlog日志進行恢復,由于執行時間不同可能產生的資料就不同,
'優點':節省空間
'缺點':有可能造成資料不一致,
row:
行級, binlog會記錄每次操作后每行記錄的變化,
'優點':保持資料的絕對一致性,因為不管sql是什么,參考了什么函式,他只記錄執行后的效果,
'缺點':占用較大空間
mixed:
statement的升級版,
一定程度上解決了,因為一些情況而造成的statement模式不一致問題
在某些情況下譬如:
當函式中包含 UUID() 時, 包含 AUTO_INCREMENT 欄位的表被更新時;執行 INSERT DELAYED 陳述句時;用 UDF 時;會按照 ROW的方式進行處理
'優點':節省空間,同時兼顧了一定的一致性,
'缺點':還有些極個別情況依舊會造成不一致,另外statement和mixed對
于需要對binlog的監控的情況都不方便,
? 由于canal不是資料庫,是不能執行sql陳述句的,所以,只能設定為row格式
binlog-do-db:
設定把哪個database的變化寫入到binlog,
如果不配置, 則所有database的變化都會寫入到binlog.
如果要設定多個資料庫需要, 需要寫多次這個引數的配置
binlog-do-db = a
binlog-do-db = b
④檢測配置是否成功
? A: 重啟mysql服務器.
Sudo systemctl restart mysqld
? B: 啟動msyql客戶端, 執行sql陳述句:
show variables like ‘%log_bin%’
? [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Cadl2c60-1605546794478)(https://i.loli.net/2020/11/16/hao4ZVTW9e1LgPS.jpg)]
? C: 也可以去對應的目錄下查看是否生成log_bin檔案
? [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-CQ6sd0tw-1605546794479)(https://i.loli.net/2020/11/16/hao4ZVTW9e1LgPS.jpg)]
2.1.5 在mysql準備業務資料
使用離線數倉中, 業務資料的生產工具生產資料即可.
注意: 更改application.properties的配置, 能夠連上mysql
2.1.6 下載安裝canal
? ①. 在mysql創建canal用戶
? canal需要監控mysql資料, 在企業中一般拿不到root用戶, 需新創建只讀取權限的用戶.
Mysql> set global validate_password_policy=0;
mysql> set global validate_password_length=4;
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘canal’@’%’ IDENTIFIED BY ‘canal’;
mysql> FLUSH PRIVILEGES;
? ②. 下載canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
? ③. 解壓安裝canal
mkdir /opt/module/canal
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/module/canal
2.1.7 配置canal
? canal有兩種配置: server級別和instance級別
- server級別的配置是對整個canal進行配置, 是一些全域性的配置. 一個sever中可以配置多個instance
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-lJQnW4m2-1605546794479)(https://i.loli.net/2020/11/16/q9rOlNZPEQYadps.jpg)]
- instance級別的配置, 是最小的訂閱mysql的佇列.
比如example實體
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-hgelC84t-1605546794480)(https://i.loli.net/2020/11/16/q9rOlNZPEQYadps.jpg)]
**①canal server配置**
vim /opt/module/canal/conf/
? 重點關注以下配置:
canal.ip = hadoop162 # canal服務器系結ip地址
canal.port = 11111 # canal埠號, 將來客戶端通過這個埠號可以讀到資料
canal.zkServers = hadoop162:2181,hadoop163:2181,hadoop164:2181 # zk地址, 用來管理canal的高可用
\# tcp, kafka, RocketMQ
\# tcp:客戶端通過tcp方式從Canal服務端拉取增量資料
\# kafka:Canal服務端將增量資料同步到kafka中,客戶端從kafka消費資料,此時客戶端感知不到Canal的存在,只需要跟kafka互動,
\# RocketMQ:同kafka,增量資料同步到RocketMQ中,
canal.serverMode = kafka
canal.destinations = atguigu # 配置實體, 如果有多個實體, 用逗號隔開. 我們創建一個atguigu實體
canal.mq.servers = hadoop162:9092,hadoop163:9092,hadoop164:9092
? ②canal instance配置
? 把目錄名example改為atguigu(其實就是和剛才的配置保存一致, 用來表示atguigu實體)
mv example atguigu
? 打開實體組態檔:
vim /opt/module/canal/conf/atguigu/instance.properties
? 在其中配置要監控的mysql和監控到的資料發送到kafka
# canal實體(slave)的id, 不能和mysql的id重復. 可以自動生成, 無需手工配置
\# canal.instance.mysql.slaveId=0
\# 要監控的mysql地址
canal.instance.master.address = hadoop162:3306
\# 連接mysql的用戶名
canal.instance.dbUsername=canal
\# 連接mysql的密碼
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
\# 該實體監控的 庫.表 默認所有庫下所有表
canal.instance.filter.regex=gmall\\..* # 監控gmall資料庫下所有包
\# kafka topic配置
canal.mq.topic=gmall_db
\# 注釋掉此配置, 此配置是只發送到一個固定磁區中
\# canal.mq.partition=0
\# 散列模式的磁區數, 要和kafka的topic的磁區數保持一致
canal.mq.partitionsNum=2
\# 如何計算每條資料進入的磁區
canal.mq.partitionHash= .*\\..*:$pk$ # 指定所有的表用主鍵hash得到磁區索引
2.1.8 canalHA配置和啟動canal
? ① canal只是支持HA, 不支持高負載, 沒有負載均衡的概念.
分發canal到hadoop103和hadoop104
? 注意: 修改canal.ip = hadoop102, 為hadoop103和hadoop104
? ② 在hadoop102,hadoop103,hadoop104分別啟動canal
/opt/module/canal/bin/startup.sh
? 注意: 需要先啟動zookeeper和kafka
2.1.9 測驗kafka是否收到實時資料
? 起一個終端消費者, 消費gmall_db
bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic gmall_db
? 生產資料:
java -jar gmall2020-mock-db-2020-05-18.jar
? 觀察消費者是否消費到資料, 如果沒有消費到資料, 則需要重新檢測canal配置
2.1.10 接收到的資料格式分析
? 發送到kafka的資料格式
{
“data”:[
{
“id”:”350”,
“consignee”:”蔣雄”,
“consignee_tel”:”13325313235”,
“final_total_amount”:”389.0”,
“order_status”:”1005”,
“user_id”:”62”,
“delivery_address”:”第17大街第7號樓9單元324門”,
“order_comment”:”描述353475”,
“out_trade_no”:”822287931878949”,
“trade_body”:”十月稻田 沁州黃小米 (黃小米 五谷雜糧 山西特產 真空裝 大米伴侶 粥米搭檔) 2.5kg等2件商品”,
“create_time”:”2020-08-26 15:02:40”,
“operate_time”:”2020-08-26 15:02:41”,
“expire_time”:”2020-08-26 15:17:40”,
“tracking_no”:null,
“parent_order_id”:null,
“img_url”:”http://img.gmall.com/933223.jpg”,
“province_id”:”3”,
“benefit_reduce_amount”:”108.0”,
“original_total_amount”:”488.0”,
“feight_fee”:”9.0”
}
],
“database”:”gmall”,
“es”:1598425361000,
“id”:73,
“isDdl”:false,
“mysqlType”:{
“id”:”bigint(20)”,
“consignee”:”varchar(100)”,
“consignee_tel”:”varchar(20)”,
“final_total_amount”:”decimal(16,2)”,
“order_status”:”varchar(20)”,
“user_id”:”bigint(20)”,
“delivery_address”:”varchar(1000)”,
“order_comment”:”varchar(200)”,
“out_trade_no”:”varchar(50)”,
“trade_body”:”varchar(200)”,
“create_time”:”datetime”,
“operate_time”:”datetime”,
“expire_time”:”datetime”,
“tracking_no”:”varchar(100)”,
“parent_order_id”:”bigint(20)”,
“img_url”:”varchar(200)”,
“province_id”:”int(20)”,
“benefit_reduce_amount”:”decimal(16,2)”,
“original_total_amount”:”decimal(16,2)”,
“feight_fee”:”decimal(16,2)”
},
“old”:[
{
“order_status”:”1002”
}
],
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:-5,
“consignee”:12,
“consignee_tel”:12,
“final_total_amount”:3,
“order_status”:12,
“user_id”:-5,
“delivery_address”:12,
“order_comment”:12,
“out_trade_no”:12,
“trade_body”:12,
“create_time”:93,
“operate_time”:93,
“expire_time”:93,
“tracking_no”:12,
“parent_order_id”:-5,
“img_url”:12,
“province_id”:4,
“benefit_reduce_amount”:3,
“original_total_amount”:3,
“feight_fee”:3
},
“table”:”order_info”,
“ts”:1598425365252,
“type”:”UPDATE”
}
2.1.11 驗證canal高可用是否正常作業
? 當前啟動canal的時候, 只有一臺設備會啟動 atguigu實體
[zk: localhost:2181(CONNECTED) 21] get /otter/canal/destinations/atguigu/running
{“active”:true,”address”:”hadoop102:11111”}
? 停止hadoop102的canal, 然后觀察:
[zk: localhost:2181(CONNECTED) 1] get /otter/canal/destinations/atguigu/running
{“active”:true,”address”:”hadoop104:11111”}
2.2 maxwell實時采集mysql資料
2.2.1 什么是maxwell
maxwell 是由美國zendesk開源,用java撰寫的Mysql實時抓取軟體, 其抓取的原理也是基于binlog,
2.2.2 maxwell和canal的對比
-
? Maxwell 沒有 Canal那種server+client模式,只有一個server把資料發送到訊息佇列或redis,
-
Maxwell 有一個亮點功能,就是Canal只能抓取最新資料,對已存在的歷史資料沒有辦法處理,而Maxwell有一個bootstrap功能,可以直接引匯出完整的歷史資料用于初始化,非常好用,
-
Maxwell不能直接支持HA,但是它支持斷點還原,即錯誤解決后重啟繼續上次點兒讀取資料,
-
Maxwell只支持json格式,而Canal如果用Server+client模式的話,可以自定義格式,
-
Maxwell比Canal更加輕量級,
2.2.3 使用maxwell前的準備作業
- 在mysql中創建一個資料庫, 用于存盤maxwell的元資料
CREATE DATABASE ‘maxwell’CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;
- 創建可以操作資料庫maxwell的用戶:maxwell
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'aaaaaa';
- 給用戶maxwell分配操作其他資料庫的權限
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@’%’;
FLUSH PRIVILEGES;
2.2.4 安裝和配置maxwell
? ①.下載maxwell
wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
? ②.解壓
tar -zxvf maxwell-1.27.1.tar.gz -C /opt/module
? ③.配置maxwell
cd /opt/module/maxwell-1.27.1
vim config.properties #注意要將之前檔案洗掉,新建一個
? 如下配置
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=maxwell_gmall_db
producer_partition_by=primary_key # 按照主鍵的hash進行磁區, 如果不設定是按照資料庫磁區
# mysql login info
host=hadoop102
user=maxwell
password=aaaaaa
client_id=maxwell_1 # 初始化維度表資料的時候使用
2.2.5 啟動maxwell
? ①啟動maxwel
/opt/module/maxwell-1.27.1 ? bin/maxwell --config config.properties --daemon
? ②確定kafka是否收到資料
起一個終端消費者
bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic maxwell_gmall_db
? ③在mysql中生成資料, 確認kafka是否收到資料.
2.2.6 maxwell發送到kafka的資料格式
{
“database”:”gmall”,
“table”:”comment_info”,
“type”:”insert”,
“ts”:1598434438,
“xid”:27085,
“commit”:true,
“data”:{
“id”:1298554271214907454,
“user_id”:988,
“sku_id”:5,
“spu_id”:5,
“order_id”:548,
“appraise”:”1201”,
“comment_txt”:”評論內容:78483837649887576216843442715245587379516398154672”,
“create_time”:”2020-08-26 17:33:58”,
“operate_time”:null
}
}
2.3 Canal和Maxwell發送到kafka的資料對比
? 為了方便做對比, 在gmall資料庫下創建一個表:test_user_info
create table test_user_info(id int primary key, name varchar(255), tel char(11));
1.插入資料
insert into test_user_info values(1, ‘lisi’, ‘13838389438’);

2.洗掉資料
delete from test_user_info where id=1;

3.更新資料
update test_user_info set name=’zs’ where id=1;

總結資料特點
①日志結構
? canal 每一條SQL會產生一條日志,如果該條Sql影響了多行資料,則已經會通過集合的方式歸集在這條日志中,(即使是一條資料也會是陣列結構)
? maxwell 以影響的資料為單位產生日志,即每影響一條資料就會產生一條日志,如果想知道這些日志是否是通過某一條sql產生的可以通過xid進行判斷,相同的xid的日志來自同一sql
②資料型別
? 當原始資料是數字型別時,maxwell會尊重原始資料的型別不增加雙引,變為字串,Canal一律轉換為字串,
③待遇按時資料欄位的定義
? canal資料中會帶入表結構,Maxwell更簡潔,
三、實時資料分層
目前在企業中, 對實時資料分層的做法還不是太普遍, 實時資料分層有好處也有壞處
好處
1. 資料可以復用,
2. 簡化計算.
3. 為OLAP查詢分擔壓力
壞處
增加了中間資料層, 會增加實時資料的延遲.
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-CAmvrESE-1605546794481)(https://i.loli.net/2020/11/16/kxEU9nuc6wfLrA4.png)]
四、ODS層處理 ——一個表一個topic
思路
--用canal和maxwell將資料發送到kafka之后,一個topic【kafka_gmall_db】里面存了所有的資料
這時候我們需要將資料分流:
1.使用spark-streaming對資料進行分流——怎么分流呢?就是將不同表的資料消費到不同的topic里面 【ods層】
總體思路:
①我們先將SparkStreaming消費kafka的資料的一些公共代碼抽象到一個抽象類BaseApp中,方便后面使用
具體的資料用抽象欄位表示出來
具體的邏輯使用抽象函式,定義一個抽象函式run方法
將 ssc: StreamingContext,
offsetRanges: ListBuffer[OffsetRange],
sourceStream: DStream[String]
當做引數傳遞過去
②定義一個BaseDBCanalApp類,繼承抽象類BaseApp,實作資料的分流
分流(處理你可能用到的表,現在用什么,處理什么,分流的時候只用這些表)
--先查看canal的資料格式
這里我們只需要三個
data是一個集合,里面有可能有多個物件,我們使用flatMap
4.1 定義抽象類——B a s e A p p(一個流消費一個Topic)
將使用SparkStreaming消費kafka的資料的一些公共代碼抽象到一個抽象類BaseApp,方便后面使用
在包com.atguigu.gmall.realtime下創建抽象類:BaseApp
思路
1.我們先將SparkStreaming消費kafka的資料的一些公共代碼抽象到一個抽象類BaseApp中,方便后面使用
具體的資料用抽象欄位表示出來
具體的邏輯使用抽象函式,定義一個抽象函式run方法
將 ssc: StreamingContext,
offsetRanges: ListBuffer[OffsetRange],
sourceStream: DStream[String]
當做引數傳遞過去
package com.atguigu.realtime
import com.atguigu.realtime.util.{MyKafkaUtil_1, OffsetManager}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s.CustomSerializer
import org.json4s.JsonAST.{JDouble, JInt, JLong, JString}
import scala.collection.mutable.ListBuffer
abstract class BaseApp {
//消費者組和主題
val master:String
val appName:String
val groupId :String
val topic:String
val bachTime:Int
val toLong: CustomSerializer[Long] = new CustomSerializer[Long](ser = format => ({
case JString(s) => s.toLong
case JInt(s) => s.toLong
},{
case s:Long => JLong(s)
}))
val toDouble = new CustomSerializer[Double](ser = format => ({
case JString(s) => s.toDouble
case JDouble(s) => s.toDouble
},{
case s:Long => JDouble(s)
}))
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(master).setAppName(appName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(bachTime))
val offsets: Map[TopicPartition, Long] = OffsetManager.readOffsets(groupId, topic)
val offsetRanges = ListBuffer.empty[OffsetRange]
val sourceStream: DStream[String] = MyKafkaUtil_1
.getKafkaStream(ssc, groupId, topic, offsets)
.transform(rdd => {
val newOffsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges]
.offsetRanges
offsetRanges.clear()
offsetRanges ++= newOffsetRanges
rdd
})
.map(_.value())
//具體的業務邏輯
run(ssc, offsetRanges, sourceStream)
ssc.start()
ssc.awaitTermination()
}
def run(ssc: StreamingContext, offsetRanges: ListBuffer[OffsetRange], sourceStream: DStream[String])
}
4.2 在Mykafkautil_1中添加需要的方法
把向kafka發資料的方法封裝在Mykafkautil_1工具類中
//此方法出來的是kafka中讀取出來的流
val kafkaProducerParams: Map[String, Object] = Map(
"bootstrap.servers" -> "hadoop162:9092,hadoop163:9092,hadoop164:9092",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"enable.idempotent" -> (true: java.lang.Boolean)
)
def getProducer = {
import scala.collection.JavaConverters._
new KafkaProducer[String, String](kafkaProducerParams.asJava)
}
4.3 處理canal采集的資料
資料格式
{
“data”:[
{
“id”:”350”,
“consignee”:”蔣雄”,
“consignee_tel”:”13325313235”,
“final_total_amount”:”389.0”,
“order_status”:”1005”,
“user_id”:”62”,
“delivery_address”:”第17大街第7號樓9單元324門”,
“order_comment”:”描述353475”,
“out_trade_no”:”822287931878949”,
“trade_body”:”十月稻田 沁州黃小米 (黃小米 五谷雜糧 山西特產 真空裝 大米伴侶 粥米搭檔) 2.5kg等2件商品”,
“create_time”:”2020-08-26 15:02:40”,
“operate_time”:”2020-08-26 15:02:41”,
“expire_time”:”2020-08-26 15:17:40”,
“tracking_no”:null,
“parent_order_id”:null,
“img_url”:”http://img.gmall.com/933223.jpg”,
“province_id”:”3”,
“benefit_reduce_amount”:”108.0”,
“original_total_amount”:”488.0”,
“feight_fee”:”9.0”
}
],
“table”:”order_info”,
“type”:”UPDATE”
}
思路
②定義一個BaseDBCanalApp類,繼承抽象類BaseApp,實作資料的分流
分流(處理你可能用到的表,現在用什么,處理什么,分流的時候只用這些表)
--先查看canal的資料格式
--決議資料
這里我們只需要三個(data,)
data是一個集合,里面有可能有多個物件,
1)--將canal消費過來的資料封裝成集合.flatMap(str=>{
我們使用flatMap決議出三個引數
implicit val f = org.json4s.DefaultFormats
val j: JValue = JsonMethods.parse(str)
val data: JValue = j \ "data" //一個集合或者陣列
val tableName: String = (j \ "table").extract[String]
val operate: String = (j \ "type").extract[String]
data是一個集合,我們使用如下操作拿到集合的所有物件,回傳值為
data.children.map(child =>(tableName,operate.toLowerCase(),Serialization.write(child)))
'回傳值型別DStream(String,String,String),是一個元組'
--})
2)--過濾出需要的表的資料.filter{
怎么過濾呢?根據表名過濾,只要表名是給出表名集合里的資料就過濾出來
當filter和rdd傳入的是元組的時候使用偏函式方便
case (tableName,operate,data)=>
只要滿足要求的表,和非洗掉的資料和內容不能是0
tableNames.contains(tableName) && operate != "delete" &&data.length>0
--}
3)--得到了我們需要的資料之后,就寫入到ods層.foreachRdd(rdd=>{
【每張表一個topic】
每個磁區寫一個
--rdd.foreachPartition((it:Iterator[(String,String,String)]) =>{
先獲取一個kafka的生產者
將資料遍歷寫入
--it.foreach{只處理order_info的insert的資料
case (tableName,operate,data)=>{
val topic = s"ods_$tableName"
if(tableName !="order_info"){
producer.send(new ProducerRecord[String, String](topic, data))
}else if (operate == "insert"){
producer.send(new ProducerRecord[String, String](topic, data))
}
}
--}
關閉生產者
--}
--})
4)---將偏移量保存
OffsetManager.saveOffsets(offsetRanges,groupId,topic)
代碼
package com.atguigu.realtime.ods
import com.atguigu.realtime.BaseApp
import com.atguigu.realtime.util.{MyKafkaUtil_1, OffsetManager}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.OffsetRange
import org.json4s.JValue
import org.json4s.jackson.{JsonMethods, Serialization}
import scala.collection.mutable.ListBuffer
object BaseDBCanalApp extends BaseApp{
override val master: String = "local[2]"
override val appName: String = "BaseDBCannalApp"
override val groupId: String = "BaseDBCanalApp"
override val topic: String = "gmall_db"
override val bachTime: Int = 3
val tableNames = List(
"order_info", //
"order_detail",
"user_info",
"base_province",
"base_category3",
"sku_info",
"spu_info",
"base_trademark")
override def run(ssc: StreamingContext,
offsetRanges: ListBuffer[OffsetRange],
sourceStream: DStream[String]): Unit ={
//分流
sourceStream.flatMap(str =>{
implicit val f = org.json4s.DefaultFormats
val j: JValue = JsonMethods.parse(str)
val data: JValue = j \ "data" //一個集合或者陣列
val tableName: String = (j \ "table").extract[String]
val operate: String = (j \ "type").extract[String]
//拿到date集合的每個物件
//date.children.map(child =>(tableName,operate,JsonMethods.compact(JsonMethods.render(child))))
date.children.map(child =>(tableName,operate.toLowerCase(),Serialization.write(child)))
})
//過濾出資料
.filter{
case (tableName,operate,data)=>
//主要滿足要求的表,和非洗掉的資料和內容不能是0
tableNames.contains(tableName) && operate != "delete" &&data.length>0
}
.foreachRDD(rdd =>{
//寫入到ODS層(kafka)
rdd.foreachPartition((it:Iterator[(String,String,String)]) =>{
//先獲取一個kafka的生產者
val producer: KafkaProducer[String, String] = MyKafkaUtil_1.getProducer
//寫入
it.foreach{
case (tableName,operate,data)=>{
val topic = s"ods_$tableName"
if(tableName !="order_info"){
producer.send(new ProducerRecord[String, String](topic, data))
}else if (operate == "insert"){
producer.send(new ProducerRecord[String, String](topic, data))
}
}
}
//關閉生產者
producer.close()
})
OffsetManager.saveOffsets(offsetRanges,groupId,topic)
})
}
}
4.4 處理maxwell采集的資料
package com.atguigu.realtime.ods
import com.atguigu.realtime.BaseApp
import com.atguigu.realtime.util.{MyKafkaUtil_1, OffsetManager}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.OffsetRange
import org.json4s.JValue
import org.json4s.jackson.{JsonMethods, Serialization}
import scala.collection.mutable.ListBuffer
object BaseDBMaxwellApp extends BaseApp{
override val master: String = "local[2]"
override val appName: String = "BaseDBMaxwellApp"
override val groupId: String = "BaseDBMaxwellApp"
override val topic: String = "maxwell_gmall_db"
override val bachTime: Int = 3
val tableNames = List(
"order_info", //
"order_detail",
"user_info",
"base_province",
"base_category3",
"sku_info",
"spu_info",
"base_trademark")
override def run(ssc: StreamingContext,
offsetRanges: ListBuffer[OffsetRange],
sourceStream: DStream[String]): Unit ={
//分流
sourceStream
.map(str =>{
implicit val f = org.json4s.DefaultFormats
val j: JValue = JsonMethods.parse(str)
val date: JValue = j \ "data" //一個集合或者陣列
val tableName: String = (j \ "table").extract[String]
val operate: String = (j \ "type").extract[String]
//拿到date集合的每個物件
//date.children.map(child =>(tableName,operate,JsonMethods.compact(JsonMethods.render(child))))
(tableName,operate.toLowerCase(),Serialization.write(date))
})
//過濾出資料
.filter{
case (tableName, operate, data)=>
//主要滿足要求的表,和非洗掉的資料和內容不能是0
tableNames.contains(tableName) && operate!="delete" &&data.length>0
}
.foreachRDD(rdd =>{
//寫入到ODS層(kafka)
rdd.foreachPartition((it:Iterator[(String,String,String)]) =>{
//先獲取一個kafka的生產者
val producer: KafkaProducer[String, String] = MyKafkaUtil_1.getProducer
//寫入
it.foreach{
case (tableName,operate,data)=>{
val topic = s"ods_$tableName"
if(tableName !="order_info"){
producer.send(new ProducerRecord[String, String](topic, data))
}else if (operate == "insert"){
producer.send(new ProducerRecord[String, String](topic, data))
}
}
}
//關閉生產者
producer.close()
})
OffsetManager.saveOffsets(offsetRanges,groupId,topic)
})
}
}
測驗
啟動zk,kafka,
啟動canal
bin/startup.sh
啟動redis
redis-server
redis-server /etc/redis.conf
maxwell開啟:
/opt/module/maxwell-1.27.1 ? bin/maxwell --config config.properties --daemon
--偏函式
--模式匹配
--抽象類
--繼承,多型,封裝資料和行為,資料放到抽象里面
--java轉scala
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/224121.html
標籤:其他
下一篇:frpc使用
