數倉搭建-DWD層
1)對用戶行為資料決議,
2)對業務資料采用維度模型重新建模,
文章目錄
- 日志決議思路
- get_json_object函式使用
- 啟動日志表
- 頁面日志表
- 動作日志表
- 曝光日志表
- 錯誤日志表
- DWD層用戶行為資料加載腳本
日志決議思路
1)日志結構回顧
(1)頁面埋點日志

(2)啟動日志

2)日志決議思路

get_json_object函式使用
1)資料
[{"name":"大郎","sex":"男","age":"25"},{"name":"西門慶","sex":"男","age":"47"}]
2)取出第一個json物件
hive (gmall)>
select get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西門慶","sex":"男","age":"47"}]','$[0]');
結果是:{“name”:“大郎”,“sex”:“男”,“age”:“25”}
3)取出第一個json的age欄位的值
hive (gmall)>
SELECT get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西門慶","sex":"男","age":"47"}]',"$[0].age");
結果是:25
啟動日志表
啟動日志決議思路:啟動日志表中每行資料對應一個啟動記錄,一個啟動記錄應該包含日志中的公共資訊和啟動資訊,先將所有包含start欄位的日志過濾出來,然后使用get_json_object函式決議每個欄位,

1)建表陳述句
DROP TABLE IF EXISTS dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '設備id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`entry` STRING COMMENT 'icon手機圖示 notice 通知 install 安裝后啟動',
`loading_time` BIGINT COMMENT '啟動加載時間',
`open_ad_id` STRING COMMENT '廣告頁ID ',
`open_ad_ms` BIGINT COMMENT '廣告總共播放時間',
`open_ad_skip_ms` BIGINT COMMENT '用戶跳過廣告時點',
`ts` BIGINT COMMENT '時間'
) COMMENT '啟動日志表'
PARTITIONED BY (`dt` STRING) -- 按照時間創建磁區
STORED AS PARQUET -- 采用parquet列式存盤
LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存盤位置
TBLPROPERTIES('parquet.compression'='lzo'); -- 采用LZO壓縮
2)資料匯入

hive (gmall)>
insert overwrite table dwd_start_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.start') is not null;
3)查看資料
hive (gmall)>
select * from dwd_start_log where dt='2020-06-14' limit 2;
頁面日志表
頁面日志決議思路:頁面日志表中每行資料對應一個頁面訪問記錄,一個頁面訪問記錄應該包含日志中的公共資訊和頁面資訊,先將所有包含page欄位的日志過濾出來,然后使用get_json_object函式決議每個欄位,

1)建表陳述句
DROP TABLE IF EXISTS dwd_page_log;
CREATE EXTERNAL TABLE dwd_page_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '設備id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`during_time` BIGINT COMMENT '持續時間毫秒',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面ID ',
`source_type` STRING COMMENT '來源型別',
`ts` bigint
) COMMENT '頁面日志表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_page_log'
TBLPROPERTIES('parquet.compression'='lzo');
2)資料匯入
hive (gmall)>
insert overwrite table dwd_page_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.page') is not null;
3)查看資料
hive (gmall)>
select * from dwd_page_log where dt='2020-06-14' limit 2;
動作日志表
動作日志決議思路:動作日志表中每行資料對應用戶的一個動作記錄,一個動作記錄應當包含公共資訊、頁面資訊以及動作資訊,先將包含action欄位的日志過濾出來,然后通過UDTF函式,將action陣列“炸開”(類似于explode函式的效果),然后使用get_json_object函式決議每個欄位,

1)建表陳述句
DROP TABLE IF EXISTS dwd_action_log;
CREATE EXTERNAL TABLE dwd_action_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '設備id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`during_time` BIGINT COMMENT '持續時間毫秒',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面id ',
`source_type` STRING COMMENT '來源型別',
`action_id` STRING COMMENT '動作id',
`item` STRING COMMENT '目標id ',
`item_type` STRING COMMENT '目標型別',
`ts` BIGINT COMMENT '時間'
) COMMENT '動作日志表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_action_log'
TBLPROPERTIES('parquet.compression'='lzo');
2)創建UDTF函式——設計思路


3)創建UDTF函式——撰寫代碼
(1)創建一個maven工程:gmall-UDTF
(2)創建包名:com.huan
(3)引入如下依賴
<dependencies>
<!--添加hive依賴-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
(4)編碼
package com.huan;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.json.JSONArray;
public class ExplodeJSONArray extends GenericUDTF {
private PrimitiveObjectInspector inputOI;
@Override
public void close() throws HiveException {
}
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
// 1 引數合法性檢查
if(argOIs.length!=1){
throw new UDFArgumentException("explode_json_array函式只能接收1個引數");
}
ObjectInspector argOI = argOIs[0];
// 2 第一個引數必須為string
//判斷引數是否為基礎資料型別
if(argOI.getCategory()!=ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentException("explode_json_array函式只能接識訓本資料型別的引數");
}
//將引數物件檢查器強轉為基礎型別物件檢查器
PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) argOI;
inputOI=primitiveOI;
//判斷引數是否為String型別
if(primitiveOI.getPrimitiveCategory()!=PrimitiveObjectInspector.PrimitiveCategory.STRING){
throw new UDFArgumentException("explode_json_array函式只能接收STRING型別的引數");
}
// 3 定義回傳值名稱和型別
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("item");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
// 1 獲取傳入的資料
Object arg = args[0];
String jsonArrayStr = PrimitiveObjectInspectorUtils.getString(arg, inputOI);
// 2 將string轉換為json陣列
JSONArray jsonArray = new JSONArray(jsonArrayStr);
// 3 回圈一次,取出陣列中的一個json,并寫出
for (int i = 0; i < jsonArray.length(); i++) {
String json = jsonArray.getString(i);
String[] result = {json};
forward(result);
}
}
}
4)創建函式
(1)打包
(2)將gmall-UDTF-1.0-SNAPSHOT.jar上傳到hadoop102的/opt/module,然后再將該jar包上傳到HDFS的/user/hive/jars路徑下
[root@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
[root@hadoop102 module]$ hadoop fs -put gmall-UDTF-1.0-SNAPSHOT.jar /user/hive/jars
(3)創建永久函式與開發好的java class關聯
create function explode_json_array as 'com.huan.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/gmall-UDTF-1.0-SNAPSHOT.jar';
(4)注意:如果修改了自定義函式重新生成jar包怎么處理?只需要替換HDFS路徑上的舊jar包,然后重啟Hive客戶端即可,
5)資料匯入
insert overwrite table dwd_action_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='2020-06-14'
and get_json_object(line,'$.actions') is not null;
6)查看資料
select * from dwd_action_log where dt='2020-06-14' limit 2;
曝光日志表
曝光日志決議思路:曝光日志表中每行資料對應一個曝光記錄,一個曝光記錄應當包含公共資訊、頁面資訊以及曝光資訊,先將包含display欄位的日志過濾出來,然后通過UDTF函式,將display陣列“炸開”(類似于explode函式的效果),然后使用get_json_object函式決議每個欄位,

1)建表陳述句
DROP TABLE IF EXISTS dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '設備id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`during_time` BIGINT COMMENT 'app版本號',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面ID ',
`source_type` STRING COMMENT '來源型別',
`ts` BIGINT COMMENT 'app版本號',
`display_type` STRING COMMENT '曝光型別',
`item` STRING COMMENT '曝光物件id ',
`item_type` STRING COMMENT 'app版本號',
`order` BIGINT COMMENT '曝光順序',
`pos_id` BIGINT COMMENT '曝光位置'
) COMMENT '曝光日志表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_display_log'
TBLPROPERTIES('parquet.compression'='lzo');
2)資料匯入
insert overwrite table dwd_display_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.display_type'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order'),
get_json_object(display,'$.pos_id')
from ods_log lateral view explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='2020-06-14'
and get_json_object(line,'$.displays') is not null;
3)查看資料
select * from dwd_display_log where dt='2020-06-14' limit 2;
錯誤日志表
錯誤日志決議思路:錯誤日志表中每行資料對應一個錯誤記錄,為方便定位錯誤,一個錯誤記錄應當包含與之對應的公共資訊、頁面資訊、曝光資訊、動作資訊、啟動資訊以及錯誤資訊,先將包含err欄位的日志過濾出來,然后使用get_json_object函式決議所有欄位,

1)建表陳述句
DROP TABLE IF EXISTS dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`area_code` STRING COMMENT '地區編碼',
`brand` STRING COMMENT '手機品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次啟動',
`model` STRING COMMENT '手機型號',
`mid_id` STRING COMMENT '設備id',
`os` STRING COMMENT '作業系統',
`user_id` STRING COMMENT '會員id',
`version_code` STRING COMMENT 'app版本號',
`page_item` STRING COMMENT '目標id ',
`page_item_type` STRING COMMENT '目標型別',
`last_page_id` STRING COMMENT '上頁型別',
`page_id` STRING COMMENT '頁面ID ',
`source_type` STRING COMMENT '來源型別',
`entry` STRING COMMENT ' icon手機圖示 notice 通知 install 安裝后啟動',
`loading_time` STRING COMMENT '啟動加載時間',
`open_ad_id` STRING COMMENT '廣告頁ID ',
`open_ad_ms` STRING COMMENT '廣告總共播放時間',
`open_ad_skip_ms` STRING COMMENT '用戶跳過廣告時點',
`actions` STRING COMMENT '動作',
`displays` STRING COMMENT '曝光',
`ts` STRING COMMENT '時間',
`error_code` STRING COMMENT '錯誤碼',
`msg` STRING COMMENT '錯誤資訊'
) COMMENT '錯誤日志表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_error_log'
TBLPROPERTIES('parquet.compression'='lzo');
說明:此處為對動作陣列和曝光陣列做處理,如需分析錯誤與單個動作或曝光的關聯,可先使用explode_json_array函式將陣列“炸開”,再使用get_json_object函式獲取具體欄位,
4)資料匯入
insert overwrite table dwd_error_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.err') is not null;
5)查看資料
hive (gmall)>
select * from dwd_error_log where dt='2020-06-14' limit 2;
DWD層用戶行為資料加載腳本
1)撰寫腳本
(1)在hadoop102的/root/bin目錄下創建腳本
[root@hadoop102 bin]$ vim ods_to_dwd_log.sh
在腳本中撰寫如下內容
DWD層用戶行為資料加載腳本
(2)增加腳本執行權限
[root@hadoop102 bin]$ chmod 777 ods_to_dwd_log.sh
2)腳本使用
(1)執行腳本
[root@hadoop102 module]$ ods_to_dwd_log.sh all 2020-06-14
(2)查詢匯入結果
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/345692.html
標籤:其他
上一篇:10月被裁,面了北京幾個大廠,總結了面試出場率極高Java面試題
下一篇:Flink任務問題分析與性能調優
