主頁 >  其他 > SparkStreaming實時數倉——首單分析(上)

SparkStreaming實時數倉——首單分析(上)

2020-11-18 15:22:30 其他

文章目錄

  • 一、需求分析及實作思路
    • 1.1 需求分析:當日新增付費用戶首單分析
    • 1.2 整體實時計算框架流程
    • 1.3 具體業務流程圖
  • 二、實時采集mysql資料
    • 2.1 canal實時采集mysql資料
      • 2.1.1 什么是canal
      • 2.1.2 canal使用場景
        • ①原始場景:阿里otter中間件的一部分
        • ②常見場景1:更新快取服務器
        • ③常見場景2
        • ④常見場景3
      • 2.1.3 canal的作業原理
        • mysql的主從復制原理
        • canal作業原理
      • 2.1.4 mysql的binlog
        • ①什么是binlog
        • ②開啟binlog
        • ③配置說明
        • ④檢測配置是否成功
      • 2.1.5 在mysql準備業務資料
      • 2.1.6 下載安裝canal
      • 2.1.7 配置canal
      • 2.1.8 canalHA配置和啟動canal
      • 2.1.9 測驗kafka是否收到實時資料
      • 2.1.10 接收到的資料格式分析
      • 2.1.11 驗證canal高可用是否正常作業
    • 2.2 maxwell實時采集mysql資料
      • 2.2.1 什么是maxwell
      • 2.2.2 maxwell和canal的對比
      • 2.2.3 使用maxwell前的準備作業
      • 2.2.4 安裝和配置maxwell
      • 2.2.5 啟動maxwell
      • 2.2.6 maxwell發送到kafka的資料格式
    • 2.3 Canal和Maxwell發送到kafka的資料對比
      • 1.插入資料
      • 2.洗掉資料
      • 3.更新資料
      • 總結資料特點
        • ①日志結構
        • ②資料型別
        • ③待遇按時資料欄位的定義
  • 三、實時資料分層
  • 四、ODS層處理 ——一個表一個topic
    • 4.1 定義抽象類——B a s e A p p(一個流消費一個Topic)
    • 4.2 在Mykafkautil_1中添加需要的方法
    • 4.3 處理canal采集的資料
    • 4.4 處理maxwell采集的資料
    • 測驗

一、需求分析及實作思路

1.1 需求分析:當日新增付費用戶首單分析

? 按省份,用戶性別,用戶年齡段,統計當日新增付費用戶收單平均消費及人數總比

? 無論是省份名稱、用戶性別、用戶年齡、訂單表中都沒有這些欄位,需要訂單(事實表)和維度表(省份,用戶)進行關聯,形成寬表后將資料寫入ES,通過kibana進行分析以及圖形展示

-- 一張事實表,兩張維度表

1.2 整體實時計算框架流程

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Qd3LFqHe-1605546794467)(https://i.loli.net/2020/11/14/mZcMAfad8kXuR17.png)]

1.3 具體業務流程圖

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-RydWFk6F-1605546794470)(https://i.loli.net/2020/11/14/baVLm7TtosFOR1S.png)]

二、實時采集mysql資料

2.1 canal實時采集mysql資料

2.1.1 什么是canal

	阿里巴巴B2B公司,因為業務的特性,賣家主要集中在國內,買家主要集中在國外,所以衍生出了杭州和美國異地機房的需求,從2010年開始,阿里系公司開始逐步的嘗試基于資料庫的日志決議,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,

	Canal是用java開發的基于資料庫增量日志決議,提供增量資料訂閱&消費的中間件,目前,canal主要支持了MySQL的binlog決議,決議完成后才利用canal client 用來處理獲得的相關資料,(資料庫同步需要阿里的otter中間件,基于canal),

2.1.2 canal使用場景

①原始場景:阿里otter中間件的一部分

otter是阿里用于進行異地資料庫之間的同步框架,canal是其中一部分,

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-yOuRrJwA-1605546794471)(https://i.loli.net/2020/11/14/I53i2UKbnaxp4WQ.png)]

②常見場景1:更新快取服務器

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-twHv75bp-1605546794473)(https://i.loli.net/2020/11/14/x9qvdB13TiXNwy5.png)]

③常見場景2

	抓取業務資料新增變化表,用于制作拉鏈表
如果表中沒有更新時間, 制作拉鏈表就需要使用canal實時監控資料的變化

④常見場景3

	抓取業務表的新增變化資料,用于制作實時統計
我們實時數倉就是這種應用場景!

2.1.3 canal的作業原理

mysql的主從復制原理

? ①. MySQL master 將資料變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)

? ②. MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)

? ③. MySQL slave 重放 relay log 中事件,將資料變更反映它自己的資料

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3uc5qupu-1605546794474)(https://i.loli.net/2020/11/15/BI8F3pRV4hTqo6r.png)]

canal作業原理

? ①. canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議

? ②. MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )

? ③. canal 決議 binary log 物件(原始為 byte 流)

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3VVAmTtp-1605546794475)(https://i.loli.net/2020/11/15/D9jx5HdmhXAeUPt.png)]

2.1.4 mysql的binlog

①什么是binlog

	MySQL的二進制日志可以說是MySQL最重要的日志了,它記錄了所有的DDL和DML(除了資料查詢陳述句)陳述句,以事件形式記錄,還包含陳述句所執行的消耗的時間,MySQL的二進制日志是事務安全型的,

--一般來說開啟二進制日志大概會有1%的性能損耗,二進制有兩個最重要的使用場景: 
其一:MySQL Replication在Master端開啟binlog,Mster把它的二進制日志傳遞給slaves來達到master-slave資料一致的目的, 
其二:自然就是資料恢復了,通過使用mysqlbinlog工具來使恢復資料,
    
--二進制日志包括/兩類檔案:
A: 二進制日志索引檔案(檔案名后綴為.index)用于記錄所有的二進制的檔案,
B:二進制日志檔案(檔案名后綴為.00000*)記錄資料庫所有的DDL和DML(除了資料查詢陳述句)陳述句事件,

②開啟binlog

默認情況下, mysql是沒有開啟binlog的, 需要手動開啟.

開啟步驟:

  1. 找到mysql的組態檔:my.cnf. 大部分的mysql版本默認在: /etc/my.cnf.

如果沒有找到, 則可以通過下面的命令查找:

sudo find / -name my.cnf

? [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-4y7OPcKB-1605546794477)(https://i.loli.net/2020/11/16/nsYzC8FIqmR1g2X.jpg)]

#而在環境變數的檔案加入這條命令,防止亂碼
export  LANG="en_US.UTF-8"
  1. 修改my.cnf. 在my.cnf檔案中增加如下內容:
server-id= 1

log-bin=mysql-bin

binlog_format=row

binlog-do-db=gmall

③配置說明

server-id:

? mysql主從復制的時候, 主從之間每個實體必須有獨一無二的id

log-bin:

? 這個表示binlog日志的前綴是mysql-bin ,以后生成的日志檔案就是 mysql-bin.123456 的檔案后面的數字按順序生成, 每次mysql重啟或者到達單個檔案大小的閾值時,新生一個檔案,按順序編號,

Binlog_format:

? mysql binlog的格式,有三種值,分別是statement, row, mixed

? 三者區別:

statement:

陳述句級,binlog會記錄每次一執行寫操作的陳述句,
    相對row模式節省空間,但是可能產生不一致性,比如
		update  tt set create_date=now() 
    如果用binlog日志進行恢復,由于執行時間不同可能產生的資料就不同,
'優點':節省空間

'缺點':有可能造成資料不一致,

row:

行級, binlog會記錄每次操作后每行記錄的變化,
   '優點':保持資料的絕對一致性,因為不管sql是什么,參考了什么函式,他只記錄執行后的效果,
   '缺點':占用較大空間

mixed:

statement的升級版,
			一定程度上解決了,因為一些情況而造成的statement模式不一致問題
    		在某些情況下譬如:
      		當函式中包含 UUID(), 包含 AUTO_INCREMENT 欄位的表被更新時;執行 INSERT DELAYED 陳述句時;用 UDF 時;會按照 ROW的方式進行處理

   '優點':節省空間,同時兼顧了一定的一致性,

   '缺點':還有些極個別情況依舊會造成不一致,另外statement和mixed對

					 于需要對binlog的監控的情況都不方便,

? 由于canal不是資料庫,是不能執行sql陳述句的,所以,只能設定為row格式

binlog-do-db:

設定把哪個database的變化寫入到binlog,

如果不配置, 則所有database的變化都會寫入到binlog.

如果要設定多個資料庫需要, 需要寫多次這個引數的配置

binlog-do-db = a

binlog-do-db = b

④檢測配置是否成功

? A: 重啟mysql服務器.

Sudo systemctl restart mysqld

? B: 啟動msyql客戶端, 執行sql陳述句:

show variables like ‘%log_bin%’

? [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Cadl2c60-1605546794478)(https://i.loli.net/2020/11/16/hao4ZVTW9e1LgPS.jpg)]

? C: 也可以去對應的目錄下查看是否生成log_bin檔案

? [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-CQ6sd0tw-1605546794479)(https://i.loli.net/2020/11/16/hao4ZVTW9e1LgPS.jpg)]

2.1.5 在mysql準備業務資料

使用離線數倉中, 業務資料的生產工具生產資料即可.

注意: 更改application.properties的配置, 能夠連上mysql

2.1.6 下載安裝canal

? ①. 在mysql創建canal用戶

? canal需要監控mysql資料, 在企業中一般拿不到root用戶, 需新創建只讀取權限的用戶.

Mysql> set global validate_password_policy=0;

mysql> set global validate_password_length=4;

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘canal’@’%’ IDENTIFIED BY ‘canal’;

mysql> FLUSH PRIVILEGES;

? ②. 下載canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

? ③. 解壓安裝canal

mkdir /opt/module/canal

tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/module/canal

2.1.7 配置canal

? canal有兩種配置: server級別和instance級別

  • server級別的配置是對整個canal進行配置, 是一些全域性的配置. 一個sever中可以配置多個instance

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-lJQnW4m2-1605546794479)(https://i.loli.net/2020/11/16/q9rOlNZPEQYadps.jpg)]

  • instance級別的配置, 是最小的訂閱mysql的佇列.

比如example實體

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-hgelC84t-1605546794480)(https://i.loli.net/2020/11/16/q9rOlNZPEQYadps.jpg)]

**①canal server配置**
vim /opt/module/canal/conf/

? 重點關注以下配置:

canal.ip = hadoop162  # canal服務器系結ip地址

canal.port = 11111  # canal埠號, 將來客戶端通過這個埠號可以讀到資料

canal.zkServers = hadoop162:2181,hadoop163:2181,hadoop164:2181 # zk地址, 用來管理canal的高可用

\# tcp, kafka, RocketMQ

\# tcp:客戶端通過tcp方式從Canal服務端拉取增量資料

\# kafka:Canal服務端將增量資料同步到kafka中,客戶端從kafka消費資料,此時客戶端感知不到Canal的存在,只需要跟kafka互動,

\# RocketMQ:同kafka,增量資料同步到RocketMQ中,

canal.serverMode = kafka 

canal.destinations = atguigu   # 配置實體, 如果有多個實體, 用逗號隔開. 我們創建一個atguigu實體

 

canal.mq.servers = hadoop162:9092,hadoop163:9092,hadoop164:9092

 

? ②canal instance配置

? 把目錄名example改為atguigu(其實就是和剛才的配置保存一致, 用來表示atguigu實體)

mv example atguigu

? 打開實體組態檔:

vim /opt/module/canal/conf/atguigu/instance.properties

? 在其中配置要監控的mysql和監控到的資料發送到kafka

# canal實體(slave)的id, 不能和mysql的id重復. 可以自動生成, 無需手工配置

\# canal.instance.mysql.slaveId=0

\# 要監控的mysql地址

canal.instance.master.address = hadoop162:3306

\# 連接mysql的用戶名

canal.instance.dbUsername=canal

\# 連接mysql的密碼

canal.instance.dbPassword=canal

canal.instance.connectionCharset = UTF-8

\# 該實體監控的 庫.表  默認所有庫下所有表   

canal.instance.filter.regex=gmall\\..*   # 監控gmall資料庫下所有包

 

\# kafka topic配置

canal.mq.topic=gmall_db

\# 注釋掉此配置, 此配置是只發送到一個固定磁區中

\# canal.mq.partition=0

 

\# 散列模式的磁區數, 要和kafka的topic的磁區數保持一致

canal.mq.partitionsNum=2

\# 如何計算每條資料進入的磁區

canal.mq.partitionHash= .*\\..*:$pk$   # 指定所有的表用主鍵hash得到磁區索引

2.1.8 canalHA配置和啟動canal

? ① canal只是支持HA, 不支持高負載, 沒有負載均衡的概念.

	分發canal到hadoop103和hadoop104

? 注意: 修改canal.ip = hadoop102, 為hadoop103和hadoop104

? ② 在hadoop102,hadoop103,hadoop104分別啟動canal

/opt/module/canal/bin/startup.sh

? 注意: 需要先啟動zookeeper和kafka

2.1.9 測驗kafka是否收到實時資料

? 起一個終端消費者, 消費gmall_db

bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic gmall_db

? 生產資料:

java -jar gmall2020-mock-db-2020-05-18.jar

? 觀察消費者是否消費到資料, 如果沒有消費到資料, 則需要重新檢測canal配置

2.1.10 接收到的資料格式分析

? 發送到kafka的資料格式

{
    “data”:[
        {
            “id”:350,
            “consignee”:”蔣雄”,
            “consignee_tel”:13325313235,
            “final_total_amount”:389.0,
            “order_status”:1005,
            “user_id”:62,
            “delivery_address”:”第17大街第7號樓9單元324門”,
            “order_comment”:”描述353475,
            “out_trade_no”:822287931878949,
            “trade_body”:”十月稻田 沁州黃小米 (黃小米 五谷雜糧 山西特產 真空裝 大米伴侶 粥米搭檔) 2.5kg等2件商品”,
            “create_time”:2020-08-26 15:02:40,
            “operate_time”:2020-08-26 15:02:41,
            “expire_time”:2020-08-26 15:17:40,
            “tracking_no”:null,
            “parent_order_id”:null,
            “img_url”:”http://img.gmall.com/933223.jpg”,
            “province_id”:3,
            “benefit_reduce_amount”:108.0,
            “original_total_amount”:488.0,
            “feight_fee”:9.0}
    ],
    “database”:”gmall”,
    “es”:1598425361000,
    “id”:73,
    “isDdl”:false,
    “mysqlType”:{
        “id”:”bigint(20),
        “consignee”:”varchar(100),
        “consignee_tel”:”varchar(20),
        “final_total_amount”:”decimal(16,2),
        “order_status”:”varchar(20),
        “user_id”:”bigint(20),
        “delivery_address”:”varchar(1000),
        “order_comment”:”varchar(200),
        “out_trade_no”:”varchar(50),
        “trade_body”:”varchar(200),
        “create_time”:”datetime”,
        “operate_time”:”datetime”,
        “expire_time”:”datetime”,
        “tracking_no”:”varchar(100),
        “parent_order_id”:”bigint(20),
        “img_url”:”varchar(200),
        “province_id”:”int(20),
        “benefit_reduce_amount”:”decimal(16,2),
        “original_total_amount”:”decimal(16,2),
        “feight_fee”:”decimal(16,2)},
    “old”:[
        {
            “order_status”:1002}
    ],
    “pkNames”:[
        “id”
    ],
    “sql”:””,
    “sqlType”:{
        “id”:-5,
        “consignee”:12,
        “consignee_tel”:12,
        “final_total_amount”:3,
        “order_status”:12,
        “user_id”:-5,
        “delivery_address”:12,
        “order_comment”:12,
        “out_trade_no”:12,
        “trade_body”:12,
        “create_time”:93,
        “operate_time”:93,
        “expire_time”:93,
        “tracking_no”:12,
        “parent_order_id”:-5,
        “img_url”:12,
        “province_id”:4,
        “benefit_reduce_amount”:3,
        “original_total_amount”:3,
        “feight_fee”:3
    },
    “table”:”order_info”,
    “ts”:1598425365252,
    “type”:UPDATE}

2.1.11 驗證canal高可用是否正常作業

? 當前啟動canal的時候, 只有一臺設備會啟動 atguigu實體

[zk: localhost:2181(CONNECTED) 21] get /otter/canal/destinations/atguigu/running

{“active”:true,”address”:”hadoop102:11111”}

? 停止hadoop102的canal, 然后觀察:

[zk: localhost:2181(CONNECTED) 1]  get /otter/canal/destinations/atguigu/running

{“active”:true,”address”:”hadoop104:11111”}

2.2 maxwell實時采集mysql資料

2.2.1 什么是maxwell

maxwell 是由美國zendesk開源,用java撰寫的Mysql實時抓取軟體, 其抓取的原理也是基于binlog,

2.2.2 maxwell和canal的對比

  1. ? Maxwell 沒有 Canal那種server+client模式,只有一個server把資料發送到訊息佇列或redis,

  2. Maxwell 有一個亮點功能,就是Canal只能抓取最新資料,對已存在的歷史資料沒有辦法處理,而Maxwell有一個bootstrap功能,可以直接引匯出完整的歷史資料用于初始化,非常好用,

  3. Maxwell不能直接支持HA,但是它支持斷點還原,即錯誤解決后重啟繼續上次點兒讀取資料,

  4. Maxwell只支持json格式,而Canal如果用Server+client模式的話,可以自定義格式,

  5. Maxwell比Canal更加輕量級,

2.2.3 使用maxwell前的準備作業

  1. 在mysql中創建一個資料庫, 用于存盤maxwell的元資料
CREATE DATABASE ‘maxwell’CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;
  1. 創建可以操作資料庫maxwell的用戶:maxwell
GRANT ALL  ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'aaaaaa';
  1. 給用戶maxwell分配操作其他資料庫的權限
GRANT  SELECT ,REPLICATION SLAVE , REPLICATION CLIENT  ON *.* TO maxwell@’%;
FLUSH PRIVILEGES;

2.2.4 安裝和配置maxwell

? ①.下載maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz

? ②.解壓

tar -zxvf maxwell-1.27.1.tar.gz -C /opt/module

? ③.配置maxwell

cd /opt/module/maxwell-1.27.1
vim config.properties   #注意要將之前檔案洗掉,新建一個

? 如下配置

# tl;dr config
log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=maxwell_gmall_db 
producer_partition_by=primary_key  # 按照主鍵的hash進行磁區, 如果不設定是按照資料庫磁區

# mysql login info
host=hadoop102
user=maxwell
password=aaaaaa
client_id=maxwell_1   # 初始化維度表資料的時候使用

2.2.5 啟動maxwell

? ①啟動maxwel

/opt/module/maxwell-1.27.1 ? bin/maxwell --config config.properties --daemon

? ②確定kafka是否收到資料

起一個終端消費者

bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic maxwell_gmall_db

? ③在mysql中生成資料, 確認kafka是否收到資料.

2.2.6 maxwell發送到kafka的資料格式

{
    “database”:”gmall”,
    “table”:”comment_info”,
    “type”:”insert”,
    “ts”:1598434438,
    “xid”:27085,
    “commit”:true,
    “data”:{
        “id”:1298554271214907454,
        “user_id”:988,
        “sku_id”:5,
        “spu_id”:5,
        “order_id”:548,
        “appraise”:1201,
        “comment_txt”:”評論內容:78483837649887576216843442715245587379516398154672,
        “create_time”:2020-08-26 17:33:58,
        “operate_time”:null
    }
}

2.3 Canal和Maxwell發送到kafka的資料對比

? 為了方便做對比, 在gmall資料庫下創建一個表:test_user_info

create table test_user_info(id int primary key, name varchar(255), tel char(11));

1.插入資料

insert into test_user_info values(1, ‘lisi’,13838389438);

image-20201117011448939

2.洗掉資料

delete from test_user_info where id=1;

image-20201117011508988

3.更新資料

update test_user_info set name=’zs’ where id=1;

image-20201117011539136

總結資料特點

①日志結構

? canal 每一條SQL會產生一條日志,如果該條Sql影響了多行資料,則已經會通過集合的方式歸集在這條日志中,(即使是一條資料也會是陣列結構)

? maxwell 以影響的資料為單位產生日志,即每影響一條資料就會產生一條日志,如果想知道這些日志是否是通過某一條sql產生的可以通過xid進行判斷,相同的xid的日志來自同一sql

②資料型別

? 當原始資料是數字型別時,maxwell會尊重原始資料的型別不增加雙引,變為字串,Canal一律轉換為字串,

③待遇按時資料欄位的定義

? canal資料中會帶入表結構,Maxwell更簡潔,

三、實時資料分層

目前在企業中, 對實時資料分層的做法還不是太普遍, 實時資料分層有好處也有壞處

好處

1. 資料可以復用,

2. 簡化計算.

3. 為OLAP查詢分擔壓力

壞處

增加了中間資料層, 會增加實時資料的延遲.

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-CAmvrESE-1605546794481)(https://i.loli.net/2020/11/16/kxEU9nuc6wfLrA4.png)]

四、ODS層處理 ——一個表一個topic

思路

--用canal和maxwell將資料發送到kafka之后,一個topic【kafka_gmall_db】里面存了所有的資料

這時候我們需要將資料分流:
	1.使用spark-streaming對資料進行分流——怎么分流呢?就是將不同表的資料消費到不同的topic里面     【ods層】
	
總體思路:

	①我們先將SparkStreaming消費kafka的資料的一些公共代碼抽象到一個抽象類BaseApp中,方便后面使用
		具體的資料用抽象欄位表示出來
		具體的邏輯使用抽象函式,定義一個抽象函式run方法
						將	ssc: StreamingContext,
                   							offsetRanges: ListBuffer[OffsetRange],
                   							sourceStream: DStream[String]
                   							當做引數傳遞過去
                   							
	②定義一個BaseDBCanalApp類,繼承抽象類BaseApp,實作資料的分流
		分流(處理你可能用到的表,現在用什么,處理什么,分流的時候只用這些表)
			--先查看canal的資料格式
			這里我們只需要三個
				data是一個集合,里面有可能有多個物件,我們使用flatMap
				
			

4.1 定義抽象類——B a s e A p p(一個流消費一個Topic)

	將使用SparkStreaming消費kafka的資料的一些公共代碼抽象到一個抽象類BaseApp,方便后面使用
在包com.atguigu.gmall.realtime下創建抽象類:BaseApp

思路

1.我們先將SparkStreaming消費kafka的資料的一些公共代碼抽象到一個抽象類BaseApp中,方便后面使用
		具體的資料用抽象欄位表示出來
		具體的邏輯使用抽象函式,定義一個抽象函式run方法
						將	ssc: StreamingContext,
                   							offsetRanges: ListBuffer[OffsetRange],
                   							sourceStream: DStream[String]
                   							當做引數傳遞過去
package com.atguigu.realtime

import com.atguigu.realtime.util.{MyKafkaUtil_1, OffsetManager}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s.CustomSerializer
import org.json4s.JsonAST.{JDouble, JInt, JLong, JString}

import scala.collection.mutable.ListBuffer

abstract class BaseApp {


  //消費者組和主題
  val master:String
  val appName:String
  val groupId :String
  val topic:String
  val bachTime:Int

  val toLong: CustomSerializer[Long] = new CustomSerializer[Long](ser = format => ({
    case JString(s) => s.toLong
    case JInt(s) => s.toLong
  },{
    case s:Long => JLong(s)
  }))
  val toDouble = new CustomSerializer[Double](ser = format => ({
    case JString(s) => s.toDouble
    case JDouble(s) => s.toDouble
  },{
    case s:Long => JDouble(s)
  }))

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster(master).setAppName(appName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(bachTime))

    val offsets: Map[TopicPartition, Long] = OffsetManager.readOffsets(groupId, topic)
    val offsetRanges = ListBuffer.empty[OffsetRange]
    val sourceStream: DStream[String] = MyKafkaUtil_1
      .getKafkaStream(ssc, groupId, topic, offsets)
      .transform(rdd => {
        val newOffsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges]
          .offsetRanges
        offsetRanges.clear()
        offsetRanges ++= newOffsetRanges
           rdd
      })
      .map(_.value())

    //具體的業務邏輯
    run(ssc, offsetRanges, sourceStream)
    ssc.start()
    ssc.awaitTermination()
  }

  def run(ssc: StreamingContext, offsetRanges: ListBuffer[OffsetRange], sourceStream: DStream[String])

}

4.2 在Mykafkautil_1中添加需要的方法

把向kafka發資料的方法封裝在Mykafkautil_1工具類中

  //此方法出來的是kafka中讀取出來的流
  val kafkaProducerParams: Map[String, Object] = Map(
    "bootstrap.servers" -> "hadoop162:9092,hadoop163:9092,hadoop164:9092",
    "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
    "enable.idempotent" -> (true: java.lang.Boolean)
  )


  def getProducer = {
    import scala.collection.JavaConverters._
    new KafkaProducer[String, String](kafkaProducerParams.asJava)
  }  

4.3 處理canal采集的資料

資料格式

{
    “data”:[
        {
            “id”:350,
            “consignee”:”蔣雄”,
            “consignee_tel”:13325313235,
            “final_total_amount”:389.0,
            “order_status”:1005,
            “user_id”:62,
            “delivery_address”:”第17大街第7號樓9單元324門”,
            “order_comment”:”描述353475,
            “out_trade_no”:822287931878949,
            “trade_body”:”十月稻田 沁州黃小米 (黃小米 五谷雜糧 山西特產 真空裝 大米伴侶 粥米搭檔) 2.5kg等2件商品”,
            “create_time”:2020-08-26 15:02:40,
            “operate_time”:2020-08-26 15:02:41,
            “expire_time”:2020-08-26 15:17:40,
            “tracking_no”:null,
            “parent_order_id”:null,
            “img_url”:”http://img.gmall.com/933223.jpg”,
            “province_id”:3,
            “benefit_reduce_amount”:108.0,
            “original_total_amount”:488.0,
            “feight_fee”:9.0}
    ],
	“table”:”order_info”,

 	“type”:UPDATE}

思路

	②定義一個BaseDBCanalApp類,繼承抽象類BaseApp,實作資料的分流
		分流(處理你可能用到的表,現在用什么,處理什么,分流的時候只用這些表)
			--先查看canal的資料格式
			--決議資料
			這里我們只需要三個(data,)
				data是一個集合,里面有可能有多個物件,
	1--將canal消費過來的資料封裝成集合.flatMap(str=>{
				我們使用flatMap決議出三個引數
				    implicit val f = org.json4s.DefaultFormats
      				val  j: JValue = JsonMethods.parse(str)
      				val data: JValue = j \ "data" //一個集合或者陣列
      				val tableName: String = (j \ "table").extract[String]
      				val operate: String = (j \ "type").extract[String]
      				
      			data是一個集合,我們使用如下操作拿到集合的所有物件,回傳值為
   data.children.map(child =>(tableName,operate.toLowerCase(),Serialization.write(child))) 
   '回傳值型別DStream(String,String,String),是一個元組'
   --})
   
   2--過濾出需要的表的資料.filter{
				怎么過濾呢?根據表名過濾,只要表名是給出表名集合里的資料就過濾出來
				當filter和rdd傳入的是元組的時候使用偏函式方便
				case (tableName,operate,data)=>
          			只要滿足要求的表,和非洗掉的資料和內容不能是0
          		     tableNames.contains(tableName) && operate != "delete"  &&data.length>0
   --}
    
   3--得到了我們需要的資料之后,就寫入到ods層.foreachRdd(rdd=>{
          		 	【每張表一個topic】
          		 	每個磁區寫一個
          		 	--rdd.foreachPartition((it:Iterator[(String,String,String)]) =>{
                     先獲取一個kafka的生產者
                     將資料遍歷寫入
                     --it.foreach{只處理order_info的insert的資料
            					case (tableName,operate,data)=>{
              					val topic = s"ods_$tableName"
              					if(tableName !="order_info"){
                					producer.send(new ProducerRecord[String, String](topic, data))
              					}else if (operate == "insert"){
                					producer.send(new ProducerRecord[String, String](topic, data))
              					}
            					}
          					--}
                     關閉生產者
                     --}
    --})
    
    4---將偏移量保存
    OffsetManager.saveOffsets(offsetRanges,groupId,topic)

代碼

package com.atguigu.realtime.ods

import com.atguigu.realtime.BaseApp
import com.atguigu.realtime.util.{MyKafkaUtil_1, OffsetManager}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.OffsetRange
import org.json4s.JValue
import org.json4s.jackson.{JsonMethods, Serialization}

import scala.collection.mutable.ListBuffer

object BaseDBCanalApp extends BaseApp{
  override val master: String = "local[2]"
  override val appName: String = "BaseDBCannalApp"
  override val groupId: String = "BaseDBCanalApp"
  override val topic: String = "gmall_db"
  override val bachTime: Int = 3
  val tableNames = List(
    "order_info",  //
    "order_detail",
    "user_info",
    "base_province",
    "base_category3",
    "sku_info",
    "spu_info",
    "base_trademark")

  override def run(ssc: StreamingContext,
                   offsetRanges: ListBuffer[OffsetRange],
                   sourceStream: DStream[String]): Unit ={
    //分流
  sourceStream.flatMap(str =>{

      implicit val f = org.json4s.DefaultFormats
      val  j: JValue = JsonMethods.parse(str)
      val data: JValue = j \ "data" //一個集合或者陣列
      val tableName: String = (j \ "table").extract[String]
      val operate: String = (j \ "type").extract[String]

      //拿到date集合的每個物件
      //date.children.map(child =>(tableName,operate,JsonMethods.compact(JsonMethods.render(child))))
    date.children.map(child =>(tableName,operate.toLowerCase(),Serialization.write(child)))
    })
    //過濾出資料
    .filter{
        case (tableName,operate,data)=>
          //主要滿足要求的表,和非洗掉的資料和內容不能是0
          tableNames.contains(tableName) && operate != "delete"  &&data.length>0
      }
      .foreachRDD(rdd =>{
        //寫入到ODS層(kafka)
        rdd.foreachPartition((it:Iterator[(String,String,String)]) =>{
          //先獲取一個kafka的生產者
          val producer: KafkaProducer[String, String] = MyKafkaUtil_1.getProducer

          //寫入
          it.foreach{
            case (tableName,operate,data)=>{
              val topic = s"ods_$tableName"
              if(tableName !="order_info"){
                producer.send(new ProducerRecord[String, String](topic, data))
              }else if (operate == "insert"){
                producer.send(new ProducerRecord[String, String](topic, data))
              }
            }
          }
          //關閉生產者
          producer.close()
        })
        OffsetManager.saveOffsets(offsetRanges,groupId,topic)
      })
  }
}

4.4 處理maxwell采集的資料

package com.atguigu.realtime.ods

import com.atguigu.realtime.BaseApp
import com.atguigu.realtime.util.{MyKafkaUtil_1, OffsetManager}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.OffsetRange
import org.json4s.JValue
import org.json4s.jackson.{JsonMethods, Serialization}

import scala.collection.mutable.ListBuffer

object BaseDBMaxwellApp extends BaseApp{
  override val master: String = "local[2]"
  override val appName: String = "BaseDBMaxwellApp"
  override val groupId: String = "BaseDBMaxwellApp"
  override val topic: String = "maxwell_gmall_db"
  override val bachTime: Int = 3
  val tableNames = List(
    "order_info",  //
    "order_detail",
    "user_info",
    "base_province",
    "base_category3",
    "sku_info",
    "spu_info",
    "base_trademark")

  override def run(ssc: StreamingContext,
                   offsetRanges: ListBuffer[OffsetRange],
                   sourceStream: DStream[String]): Unit ={
    //分流
  sourceStream
    .map(str =>{

      implicit val f = org.json4s.DefaultFormats
      val  j: JValue = JsonMethods.parse(str)

      val date: JValue = j \ "data" //一個集合或者陣列

      val tableName: String = (j \ "table").extract[String]
      val operate: String = (j \ "type").extract[String]

      //拿到date集合的每個物件
      //date.children.map(child =>(tableName,operate,JsonMethods.compact(JsonMethods.render(child))))
    (tableName,operate.toLowerCase(),Serialization.write(date))
    })
    //過濾出資料
    .filter{
        case (tableName, operate, data)=>
          //主要滿足要求的表,和非洗掉的資料和內容不能是0
          tableNames.contains(tableName) && operate!="delete"  &&data.length>0
      }
      .foreachRDD(rdd =>{
        //寫入到ODS層(kafka)
        rdd.foreachPartition((it:Iterator[(String,String,String)]) =>{
          //先獲取一個kafka的生產者
          val producer: KafkaProducer[String, String] = MyKafkaUtil_1.getProducer
          //寫入
          it.foreach{
            case (tableName,operate,data)=>{
              val topic = s"ods_$tableName"
              if(tableName !="order_info"){
                producer.send(new ProducerRecord[String, String](topic, data))
              }else if (operate == "insert"){
                producer.send(new ProducerRecord[String, String](topic, data))
              }
            }
          }
          //關閉生產者
          producer.close()
        })
        OffsetManager.saveOffsets(offsetRanges,groupId,topic)
      })
  }
}

測驗

啟動zk,kafka,
啟動canal
bin/startup.sh
啟動redis
redis-server
redis-server /etc/redis.conf
maxwell開啟:
/opt/module/maxwell-1.27.1 ? bin/maxwell --config config.properties --daemon
--偏函式
--模式匹配

--抽象類
--繼承,多型,封裝資料和行為,資料放到抽象里面
--java轉scala

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/224121.html

標籤:其他

上一篇:2020年10月-上海-彩貝殼-面試題匯總

下一篇:frpc使用

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more