指標統計
文章目錄
- 指標統計
- 第一節 指標體系
- 第二節 業務資料
- 1.1 物流系統資料庫表
- 攬件表(lg_collect_package)
- 客戶表(lg_customer)
- 物流系統碼表(lg_codes)
- 快遞單據表(lg_express_bill)
- 客戶地址表(lg_customer_address)
- 網點表(lg_dot)
- 公司表(lg_company)
- 公司網點關聯表(lg_company_dot_map)
- 運單表(lg_waybill)
- 線路表(lg_route)
- 運輸工具表(lg_transport_tool)
- 轉運記錄表(lg_transport_record)
- 包裹表(lg_pkg)
- 運單路線明細表(lg_waybill_line)
- 快遞員表(lg_courier)
- 區域表(lg_areas)
- 第三節 快遞單主題
- 3.1 lg_ods層建表
- 3.2 資料采集
- 3.3 ETL
- 1 指標明細
- 2 表關系示意圖
- 3 預處理
- 3.1 事實表
- 3.2 維度表
- 3.4 指標統計
- 計算的欄位
- 指標統計sql
- 匯出指標資料
- 第四節 運單主題
- 4.1 lg_ods層建表
- 4.2 資料采集
- - 事實表
- - 維度表
- 4.3 ETL
- 1 指標明細
- 2 表關系示意圖
- 3 預處理
- 3.1 事實表
- 3.2 維度表
- 4.4 指標統計
第一節 指標體系
- 快遞單主題
- 快遞單量的統計主要是從多個不同的維度計算快遞單量,從而監測公司快遞業務運營情況,

- 快遞單量的統計主要是從多個不同的維度計算快遞單量,從而監測公司快遞業務運營情況,
- 運單主題
- 運單統計根據區域、公司、網點、線路、運輸工具等維度進行統計,可以對各個維度運單數量進行排行,如對網點運單進行統計可以反映該網點的運營情況

- 運單統計根據區域、公司、網點、線路、運輸工具等維度進行統計,可以對各個維度運單數量進行排行,如對網點運單進行統計可以反映該網點的運營情況
第二節 業務資料
1.1 物流系統資料庫表
攬件表(lg_collect_package)

客戶表(lg_customer)

物流系統碼表(lg_codes)

快遞單據表(lg_express_bill)

客戶地址表(lg_customer_address)

網點表(lg_dot)

公司表(lg_company)

公司網點關聯表(lg_company_dot_map)

運單表(lg_waybill)

線路表(lg_route)

運輸工具表(lg_transport_tool)

轉運記錄表(lg_transport_record)

包裹表(lg_pkg)

運單路線明細表(lg_waybill_line)

快遞員表(lg_courier)

區域表(lg_areas)

第三節 快遞單主題
3.1 lg_ods層建表
建表陳述句
--客戶地址表
DROP TABLE IF EXISTS `lg_ods.lg_address`;
CREATE TABLE `lg_ods.lg_address` (
`id` string,
`name` string,
`tel` string,
`mobile` string,
`detail_addr` string,
`area_id` string,
`gis_addr` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '客戶地址表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
--客戶表
DROP TABLE IF EXISTS `lg_ods.lg_customer`;
CREATE TABLE `lg_ods.lg_customer` (
`id` string,
`name` string,
`tel` string,
`mobile` string,
`email` string,
`type` string,
`is_own_reg` string,
`reg_dt` string,
`reg_channel_id` string,
`state` string,
`cdt` string,
`udt` string,
`last_login_dt` string,
`remark` string)
COMMENT '客戶表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_areas`;
CREATE TABLE `lg_ods.lg_areas` (
`id` string,
`name` string,
`pid` string,
`sname` string,
`level` string,
`citycode` string,
`yzcode` string,
`mername` string,
`lng` string,
`lat` string,
`pinyin` string
) COMMENT '區域表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_courier`;
CREATE TABLE `lg_ods.lg_courier` (
`id` string,
`job_num` string,
`name` string,
`birathday` string,
`tel` string,
`pda_num` string,
`car_id` string,
`postal_standard_id` string,
`work_time_id` string,
`dot_id` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '快遞員表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_dot`;
CREATE TABLE `lg_ods.lg_dot` (
`id` string,
`dot_number` string,
`dot_name` string,
`dot_addr` string,
`dot_gis_addr` string,
`dot_tel` string,
`company_id` string,
`manage_area_id` string,
`manage_area_gis` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '網點表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_company_dot_map`;
CREATE TABLE `lg_ods.lg_company_dot_map` (
`id` string,
`company_id` string,
`dot_id` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '公司網點關聯表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_company`;
CREATE TABLE `lg_ods.lg_company` (
`id` string,
`company_name` string,
`city_id` string,
`company_number` string,
`company_addr` string,
`company_addr_gis` string,
`company_tel` string,
`is_sub_company` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '公司表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_customer_address_map`;
CREATE TABLE `lg_ods.lg_customer_address_map` (
`id` string,
`consumer_id` string,
`address_id` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '客戶地址關聯表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_codes`;
CREATE TABLE `lg_ods.lg_codes` (
`id` string,
`name` string,
`type` string,
`code` string,
`code_desc` string,
`code_type` string,
`state` string,
`cdt` string,
`udt` string
) COMMENT '字典表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_express_bill`;
CREATE TABLE `lg_ods.lg_express_bill` (
`id` string,
`express_number` string,
`cid` string,
`eid` string,
`order_channel_id` string,
`order_dt` string,
`order_terminal_type` string,
`order_terminal_os_type` string,
`reserve_dt` string,
`is_collect_package_timeout` string,
`timeout_dt` string,
`type` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '快遞單據表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
DROP TABLE IF EXISTS `lg_ods.lg_pkg`;
CREATE TABLE `lg_ods.lg_pkg` (
`id` string,
`pw_bill` string,
`pw_dot_id` string,
`warehouse_id` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '包裹表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
3.2 資料采集
業務資料保存在MySQL中,每日凌晨匯入上一天的表資料,

- 事實表
- 全量匯入
linux123 :cd /lg_logstic/data_collect/all_import/
vim import_express_bill.sh
- 全量匯入
#!/bin/bash
source /etc/profile
##如果第一個引數不為空,則作為作業日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期,減一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
# 全量匯入快遞單據表資料
import_lg_express_bill(){
import_data lg_express_bill "select
*
from lg_express_bill
where 1=1"
}
# 全量匯入包裹表方法
import_lg_pkg(){
import_data lg_pkg "select
*
from lg_pkg
where 1=1"
}
#呼叫全量匯入資料方法
import_lg_express_bill
import_lg_pkg
#注意sqoop匯入資料的方式,對于Hive磁區表來說需要執行添加磁區操作,資料才能被識別到
$Hive -e "alter table lg_ods.lg_express_bill addpartition(dt='$do_date');
alter table lg_ods.lg_pkg add partition(dt='$do_date');"
[root@linux123 all_import]# sh import_express_bill.sh 20210826
- 增量匯入
#!/bin/bash
source /etc/profile
##如果第一個引數不為空,則作為作業日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期,減一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
# 增量匯入快遞單據表資料
import_lg_express_bill(){
import_data lg_express_bill "select
*
from lg_express_bill
WHERE DATE_FORMAT(cdt, '%Y%m%d') = '${do_date}'
"
}
# 增量匯入包裹表方法
import_lg_pkg(){
import_data lg_pkg "select
*
from lg_pkg
WHERE DATE_FORMAT(cdt, '%Y%m%d') = '${do_date}'
"
}
#呼叫全量匯入資料方法
import_lg_express_bill
import_lg_pkg
#注意sqoop匯入資料的方式,對于Hive磁區表來說需要執行添加磁區操作,資料才能被識別到
$Hive -e "alter table lg_ods.lg_express_bill add
partition(dt='$do_date');
alter table lg_ods.lg_pkg add partition(dt='$do_date');"
- 維度表
- 全量匯入
#!/bin/bash
source /etc/profile
##如果第一個引數不為空,則作為作業日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期,減一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin/hive
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
# 全量匯入客戶地址表
import_lg_address(){
import_data lg_address "select
*
from lg_address
where 1=1"
}
# 全量匯入倉庫方法
import_lg_entrepots(){
import_data lg_entrepots "select
*
from lg_entrepots
where 1=1"
}
# 全量匯入區域表
import_lg_areas(){
import_data lg_areas "select
*
from lg_areas
where 1=1"
}
# 全量匯入快遞員表
import_lg_courier(){
import_data lg_courier "select
*
from lg_courier
where 1=1"
}
# 全量匯入網點表
import_lg_dot(){
import_data lg_dot "select
*
from lg_dot
where 1=1"
}
# 全量匯入公司網點關聯表
import_lg_company_dot_map(){
import_data lg_company_dot_map "select
*
from lg_company_dot_map
where 1=1"
}
# 全量匯入公司網點關聯表
import_lg_company(){
import_data lg_company "select
*
from lg_company
where 1=1"
}
# 全量匯入客戶表
import_lg_customer(){
import_data lg_customer "select
*
from lg_customer
where 1=1"
}
# 全量匯入客戶地址關聯表
import_lg_customer_address_map(){
import_data lg_customer_address_map "select
*
from lg_customer_address_map
where 1=1"
}
# 全量匯入字典表
import_lg_codes(){
import_data lg_codes "select
*
from lg_codes
where 1=1"
}
#呼叫全量匯入資料方法
import_lg_address
import_lg_entrepots
import_lg_areas
import_lg_courier
import_lg_dot
import_lg_company_dot_map
import_lg_company
import_lg_customer
import_lg_customer_address_map
import_lg_codes
#注意sqoop匯入資料的方式,對于Hive磁區表來說需要執行添加磁區操作,資料才能被識別到
$Hive -e "alter table lg_ods.lg_address add partition(dt='$do_date');
alter table lg_ods.lg_areas add partition(dt='$do_date');
alter table lg_ods.lg_courier add partition(dt='$do_date');
alter table lg_ods.lg_dot add partition(dt='$do_date');
alter table lg_ods.lg_company_dot_map add partition(dt='$do_date');
alter table lg_ods.lg_company add partition(dt='$do_date');
alter table lg_ods.lg_customer add partition(dt='$do_date');
alter table lg_ods.lg_customer_address_map add partition(dt='$do_date');
alter table lg_ods.lg_codes add partition(dt='$do_date');"
3.3 ETL
1 指標明細

2 表關系示意圖

事實表

維度表


3 預處理
創建lg_dwd層表
3.1 事實表
事實表拉寬

創建lg_dwd層表
create database lg_dwd;
use lg_dwd;
DROP TABLE IF EXISTS `lg_dwd.expression_lg_express_pkg`;
CREATE TABLE `lg_dwd.expression_lg_express_pkg` (
id string,
express_number string,
cid string,
eid string,
order_channel_id string,
order_dt string,
order_terminal_type string,
order_terminal_os_type string,
reserve_dt string,
is_collect_package_timeout string,
timeout_dt string,
cdt string,
udt string,
remark string,
pw_bill string,
pw_dot_id string
)COMMENT '快遞單據表-包裹關聯表'
partitioned by (dt string) STORED AS PARQUET ;
ETL-SQL
insert overwrite table lg_dwd.expression_lg_express_pkg partition(dt='2021-08-26')
select
ebill.id ,
ebill.express_number ,
ebill.cid ,
ebill.eid ,
ebill.order_channel_id ,
ebill.order_dt,
ebill.order_terminal_type,
ebill.order_terminal_os_type ,
ebill.reserve_dt ,
ebill.is_collect_package_timeout ,
ebill.timeout_dt ,
ebill.cdt ,
ebill.udt ,
ebill.remark ,
pkg.pw_bill,
pkg.pw_dot_id
from
(select * from lg_ods.lg_express_bill where dt='20210826') ebill
left join (select pw_bill,pw_dot_id from lg_ods.lg_pkg where dt ='20210826') pkg
on ebill.express_number =pkg.pw_bill ;
3.2 維度表

客戶地址資訊拉寬表
創建表
drop table if exists lg_dim.express_customer_address;
create table lg_dim.express_customer_address(
id string,
cname string,
caddress string,
type string)COMMENT '快遞單主題-客戶地址資訊'
partitioned by (dt string) STORED AS PARQUET ;
insert overwrite table lg_dim.express_customer_address partition(dt='2021-08-26')
select
customer.id as id,
customer.name as cname,
address.detail_addr as caddress,
customer.type as type
from
(select name,id,type from lg_ods.lg_customer where dt ='20210826') customer
left join (select address_id,consumer_id from lg_ods.lg_customer_address_map
where dt='20210826' ) address_map
on customer.id=address_map.consumer_id
left join (select id,detail_addr from lg_ods.lg_address where dt ='20210826') address
on address_map.address_id=address.id ;
ETL-SQL

網點公司資訊拉寬表
drop table if exists lg_dim.express_company_dot_addr;
create table lg_dim.express_company_dot_addr(
id string,
dot_name string,
company_name string)COMMENT '快遞單主題-公司網點地址資訊'
partitioned by (dt string) STORED AS PARQUET ;
insert overwrite table lg_dim.express_company_dot_addr partition(dt='2021-08-26')
select
dot.id as id,
dot.dot_name as dot_name,
company.company_name as company_name
from (select id,dot_name from lg_ods.lg_dot where dt ='20210826') dot
left join (select dot_id,company_id from lg_ods.lg_company_dot_map where dt='20210826') companydot on dot.id=companydot.dot_id
left join (select company_name,id from lg_ods.lg_company where dt ='20210826') company on company.id=companydot.company_id ;
3.4 指標統計
計算的欄位

-- 創建lg_dws層資料庫
drop table if exists lg_dws.express_base_agg;
create table lg_dws.express_base_agg(
express_count bigint,
customer_type string,
dot_name string,
channel_id string,
terminal_type string)COMMENT '快遞單主題-初步匯總表'
partitioned by (dt string) STORED AS PARQUET ;
insert overwrite table lg_dws.express_base_agg partition(dt='2021-08-26')
select
count(express_number) as express_count,
t2.type as customer_type,
t3.dot_name as dot_name,
t1.order_channel_id as channel_id,
t1.order_terminal_type as terminal_type
from
(select * from lg_dwd.expression_lg_express_pkg where dt ='2021-08-26') t1
left join
(select * from lg_dim.express_customer_address where dt ='2021-08-26') t2
on t1.cid =t2.id
left join
(select * from lg_dim.express_company_dot_addr where dt ='2021-08-26') t3
on t1.pw_dot_id =t3.id
group by
t2.type,
t3.dot_name,
t1.order_channel_id,
t1.order_terminal_type;
指標統計sql
--最大快遞單數
--各類客戶最大快遞單數
select sum(express_count) as customer_type_max_count ,t.customer_type as customer_type from (select * from lg_dws.express_base_agg where dt='2021-08-26') t
group by t.customer_type order by customer_type_max_count desc limit 1;
--各渠道最大快遞單數
select sum(express_count) as channel_max_count ,t.channel_id as channel_id
from(select * from lg_dws.express_base_agg where dt='2021-08-26') t
group by t.channel_id order by channel_max_count desc limit 1;
--各終端最大快遞單數
select sum(express_count) as terminal_max_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.terminal_type order by terminal_max_count desc limit 1;
--各網點最大快遞單數
select sum(express_count) as dot_max_count ,t.dot_name as dot_name from (select
* from lg_dws.express_base_agg where dt='2021-08-26') t group by t.dot_name
order by dot_max_count desc limit 1;
--最小快遞單數
--各類客戶最小快遞單數
select sum(express_count) as customer_type_min_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.customer_type order by customer_type_min_count asc limit 1;
--各渠道最小快遞單數
select sum(express_count) as channel_min_count ,t.channel_id as channel_id from
(select * from lg_dws.express_base_agg where dt='2021-08-26') t group by
t.channel_id order by channel_min_count asc limit 1;
--各終端最小快遞單數
select sum(express_count) as terminal_min_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.terminal_type order by terminal_min_count asc limit 1;
--各網點最小快遞單數
select sum(express_count) as dot_min_count ,t.dot_name as dot_name from (select
* from lg_dws.express_base_agg where dt='2021-08-26') t group by t.dot_name
order by dot_min_count asc limit 1;
--創建lg_ads層表
drop table if exists lg_ads.express_metrics;
create table lg_ads.express_metrics(
customer_type_max_count bigint,
customer_max_type string,
channel_max_count bigint,
channel_max_id string,
terminal_max_count bigint,
terminal_max_type string,
dot_max_count bigint,
dot_max_name string,
customer_type_min_count bigint,
customer_min_type string,
channel_min_count bigint,
channel_min_id string,
terminal_min_count bigint,
terminal_min_type string,
dot_min_count bigint,
dot_min_name string,
dt string
)COMMENT '快遞單主題-指標表' row format delimited fields terminated by ',' STORED AS textfile ;
--匯總sql
insert into table lg_ads.express_metrics
select
t1.customer_type_max_count,
t1.customer_type as customer_max_type,
t2.channel_max_count,
t2.channel_id as channel_max_id,
t3.terminal_max_count,
t3.terminal_type as terminal_max_type,
t4.dot_max_count,
t4.dot_name as dot_max_name,
t5.customer_type_min_count,
t5.customer_type as customer_min_type ,
t6.channel_min_count,
t6.channel_id as channel_min_id,
t7.terminal_min_count,
t7.terminal_type as terminal_min_type,
t8.dot_min_count,
t8.dot_name as dot_min_name,
'2021-08-26'
from
(select sum(express_count) as customer_type_max_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.customer_type order by customer_type_max_count desc limit 1) t1
join
(select sum(express_count) as channel_max_count ,t.channel_id as channel_id
from (select * from lg_dws.express_base_agg where dt='2021-08-26') t group by
t.channel_id order by channel_max_count desc limit 1 ) t2 on 1=1
join
(select sum(express_count) as terminal_max_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.terminal_type order by terminal_max_count desc limit 1) t3
join
(select sum(express_count) as dot_max_count ,t.dot_name as dot_name from
(select * from lg_dws.express_base_agg where dt='2021-08-26') t group by
t.dot_name order by dot_max_count desc limit 1) t4
join
(select sum(express_count) as customer_type_min_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.customer_type order by customer_type_min_count asc limit 1) t5
join
(select sum(express_count) as channel_min_count ,t.channel_id as channel_id from
(select * from lg_dws.express_base_agg where dt='2021-08-26') t group by
t.channel_id order by channel_min_count asc limit 1) t6
join
(select sum(express_count) as terminal_min_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2021-08-26')
t group by t.terminal_type order by terminal_min_count asc limit 1) t7
join
(select sum(express_count) as dot_min_count ,t.dot_name as dot_name from (select
* from lg_dws.express_base_agg where dt='2021-08-26') t group by t.dot_name
order by dot_min_count asc limit 1) t8;

如上報錯,需要設定非嚴格模式
set hive.mapred.mode=nonstrict;是非嚴格模式
如果設定為strict(嚴格模式),有三個影響
1.在order by 的時候必須使用limit限制輸出條數,因為hivesql跟傳統的sql一樣,都會做全域排序,但是hive的底層是MR,為保證順序性order by會將資料都交給一個reducer,這樣會造成資料傾斜不出資料,因此嚴格模式必須使用limit提升性能,同時跟傳統的sql唯一的區別是mysql可以不用limit
2.在磁區表里,必須使用磁區,多磁區的指定一個磁區也行
3.在使用join陳述句的時候,必須使用on,標配join…on…,left join…on…
匯出指標資料
創建Mysql指標表
CREATE TABLE lg_logstic.`express_agg_metrics` (
`customer_type_max_count` BIGINT ( 20 ) DEFAULT NULL,
`customer_max_type` VARCHAR ( 10 ) DEFAULT NULL,
`channel_max_count` BIGINT ( 20 ) DEFAULT NULL,
`channel_max_id` VARCHAR ( 10 ) DEFAULT NULL,
`terminal_max_count` BIGINT ( 20 ) DEFAULT NULL,
`terminal_max_type` VARCHAR ( 10 ) DEFAULT NULL,
`dot_max_count` BIGINT ( 20 ) DEFAULT NULL,
`dot_max_name` VARCHAR ( 25 ) DEFAULT NULL,
`customer_type_min_count` BIGINT ( 20 ) DEFAULT NULL,
`customer_min_type` VARCHAR ( 10 ) DEFAULT NULL,
`channel_min_count` BIGINT ( 20 ) DEFAULT NULL,
`channel_min_id` VARCHAR ( 10 ) DEFAULT NULL,
`terminal_min_count` BIGINT ( 20 ) DEFAULT NULL,
`terminal_min_type` VARCHAR ( 10 ) DEFAULT NULL,
`dot_min_count` BIGINT ( 20 ) DEFAULT NULL,
`dot_min_name` VARCHAR ( 25 ) DEFAULT NULL,
`dt` VARCHAR ( 25 ) NOT NULL,
PRIMARY KEY ( `dt` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
使用Sqoop 工具匯出
/lg_logstic/data_collect/all_import/export_express.sh
#!/bin/bash
source /etc/profile
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin/hive
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
export_data(){
$sqoop export \
--connect "jdbc:mysql://linux123:3306/lg_logstic?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 12345678 \
--table $1 \
--export-dir /user/hive/warehouse/lg_ads.db/$2/ \
--num-mappers 1 \
--input-fields-terminated-by ',' \
--update-mode allowinsert \
--update-key dt
}
# 匯出快遞單指標資料
export_lg_express_metrics(){
export_data express_agg_metrics express_metrics
}
#匯出資料方法
export_lg_express_metrics
cd /lg_logstic/data_collect/all_import
sh export_express.sh

運行中一直卡在截圖位置,重啟dfs,yarn,運行正常

第四節 運單主題
4.1 lg_ods層建表
建表陳述句
--運單表
DROP TABLE IF EXISTS `lg_ods.lg_waybill`;
CREATE TABLE `lg_ods.lg_waybill` (
`id` string,
`express_bill_number` string,
`waybill_number` string,
`cid` string,
`eid` string,
`order_channel_id` string,
`order_dt` string,
`order_terminal_type` string,
`order_terminal_os_type` string,
`reserve_dt` string,
`is_collect_package_timeout` string,
`pkg_id` string,
`pkg_number` string,
`timeout_dt` string,
`transform_type` string,
`delivery_customer_name` string,
`delivery_addr` string,
`delivery_mobile` string,
`delivery_tel` string,
`receive_customer_name` string,
`receive_addr` string,
`receive_mobile` string,
`receive_tel` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '運單表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
--轉運記錄表:lg_transport_record
DROP TABLE IF EXISTS `lg_ods.lg_transport_record`;
CREATE TABLE `lg_ods.lg_transport_record` (
`id` string,
`pw_id` string,
`pw_waybill_id` string,
`pw_waybill_number` string,
`ow_id` string,
`ow_waybill_id` string,
`ow_waybill_number` string,
`sw_id` string,
`ew_id` string,
`transport_tool_id` string,
`pw_driver1_id` string,
`pw_driver2_id` string,
`pw_driver3_id` string,
`ow_driver1_id` string,
`ow_driver2_id` string,
`ow_driver3_id` string,
`route_id` string,
`distance` string,
`duration` string,
`state` string,
`start_vehicle_dt` string,
`predict_arrivals_dt` string,
`actual_arrivals_dt` string,
`cdt` string,
`udt` string,
`remark` string
)COMMENT '轉運記錄表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
--線路表:lg_route
DROP TABLE IF EXISTS `lg_ods.lg_route`;
CREATE TABLE `lg_ods.lg_route` (
`id` string,
`start_station` string,
`start_station_area_id` string,
`start_warehouse_id` string,
`end_station` string,
`end_station_area_id` string,
`end_warehouse_id` string,
`mileage_m` string,
`time_consumer_minute` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '線路表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
--車輛表:lg_transport_tool
DROP TABLE IF EXISTS `lg_ods.lg_transport_tool`;
CREATE TABLE `lg_ods.lg_transport_tool` (
`id` string,
`brand` string,
`model` string,
`type` string,
`given_load` string,
`load_cn_unit` string,
`load_en_unit` string,
`buy_dt` string,
`license_plate` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '車輛表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
4.2 資料采集
業務資料保存在MySQL中,每日凌晨匯入上一天的表資料,
- 事實表
- 全量匯入
import_lg_waybill.sh
#!/bin/bash
source /etc/profile
##如果第一個引數不為空,則作為作業日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期,減一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin/hive
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
# 全量匯入運單表資料
import_lg_waybill(){
import_data lg_waybill "select
*
from lg_waybill
where 1=1"
}
# 全量匯入轉運記錄表
import_lg_transport_record(){
import_data lg_transport_record "select
*
from lg_transport_record
where 1=1"
}
#呼叫全量匯入資料方法
import_lg_waybill
import_lg_transport_record
#注意sqoop匯入資料的方式,對于Hive磁區表來說需要執行添加磁區操作,資料才能被識別到
$Hive -e "alter table lg_ods.lg_waybill add partition(dt='$do_date');
alter table lg_ods.lg_transport_record add partition(dt='$do_date');"
cd /lg_logstic/data_collect/all_import
sh import_lg_waybill.sh 2021-08-26

- 增量匯入
#!/bin/bash
source /etc/profile
##如果第一個引數不為空,則作為作業日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期,減一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin/hive
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
# 增量匯入運單資料
import_lg_waybill(){
import_data lg_waybill "select
*
from lg_waybill
WHERE DATE_FORMAT(cdt, '%Y%m%d') = '${do_date}'
"
}
# 增量匯入轉運記錄
import_lg_transport_record(){
import_data lg_transport_record "select
*
from lg_transport_record
WHERE DATE_FORMAT(cdt, '%Y%m%d') = '${do_date}'
"
}
#呼叫全量匯入資料方法
import_lg_waybill
import_lg_transport_record
#注意sqoop匯入資料的方式,對于Hive磁區表來說需要執行添加磁區操作,資料才能被識別到
$Hive -e "alter table lg_ods.lg_express_bill add partition(dt='$do_date');
alter table lg_ods.lg_pkg add partition(dt='$do_date');"
- 維度表
- 全量匯入
#!/bin/bash
source /etc/profile
##如果第一個引數不為空,則作為作業日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期,減一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin/hive
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
# 全量匯入線路表
import_lg_route(){
import_data lg_route "select
*
from lg_route
where 1=1"
}
# 全量匯入車輛表
import_lg_transport_tool(){
import_data lg_transport_tool "select
*
from lg_transport_tool
where 1=1"
}
#呼叫全量匯入資料方法
import_lg_route
import_lg_transport_tool
#注意sqoop匯入資料的方式,對于Hive磁區表來說需要執行添加磁區操作,資料才能被識別到
$Hive -e "alter table lg_ods.lg_route add partition(dt='$do_date');
alter table lg_ods.lg_transport_tool add partition(dt='$do_date');"
sh import_lg_route.sh 2021-08-26
4.3 ETL
1 指標明細

2 表關系示意圖
事實表

維度表


3 預處理
3.1 事實表
運單與轉運記錄寬表
drop table if exists lg_dwd.waybill_way_tran_record;
create table lg_dwd.waybill_way_tran_record(
express_bill_number string,
waybill_number string,
cid string,
eid string,
order_channel_id string,
order_dt string,
order_terminal_type string,
order_terminal_os_type string,
reserve_dt string,
is_collect_package_timeout string,
pkg_id string,
pkg_number string,
timeout_dt string,
transform_type string,
delivery_customer_name string,
delivery_addr string,
delivery_mobile string,
delivery_tel string,
receive_customer_name string,
receive_addr string,
receive_mobile string,
receive_tel string,
pw_id string,
pw_waybill_id string,
pw_waybill_number string,
ow_id string,
ow_waybill_id string,
ow_waybill_number string,
sw_id string,
ew_id string,
transport_tool_id string,
pw_driver1_id string,
pw_driver2_id string,
pw_driver3_id string,
ow_driver1_id string,
ow_driver2_id string,
ow_driver3_id string,
route_id string,
distance string,
duration string,
state string,
start_vehicle_dt string,
predict_arrivals_dt string,
actual_arrivals_dt string
)COMMENT '運單主題-運單與轉運記錄事實表'
partitioned by (dt string) STORED AS PARQUET ;
--插入資料
insert overwrite table lg_dwd.waybill_way_tran_record partition(dt='2021-08-26')
select
t1.express_bill_number ,
t1.waybill_number ,
t1.cid ,
t1.eid ,
t1.order_channel_id ,
t1.order_dt ,
t1.order_terminal_type ,
t1.order_terminal_os_type ,
t1.reserve_dt ,
t1.is_collect_package_timeout,
t1.pkg_id ,
t1.pkg_number ,
t1.timeout_dt ,
t1.transform_type ,
t1.delivery_customer_name ,
t1.delivery_addr ,
t1.delivery_mobile ,
t1.delivery_tel ,
t1.receive_customer_name ,
t1.receive_addr ,
t1.receive_mobile ,
t1.receive_tel ,
t2.pw_id ,
t2.pw_waybill_id ,
t2.pw_waybill_number ,
t2.ow_id ,
t2.ow_waybill_id ,
t2.ow_waybill_number ,
t2.sw_id ,
t2.ew_id ,
t2.transport_tool_id ,
t2.pw_driver1_id ,
t2.pw_driver2_id ,
t2.pw_driver3_id ,
t2.ow_driver1_id ,
t2.ow_driver2_id ,
t2.ow_driver3_id ,
t2.route_id ,
t2.distance ,
t2.duration ,
t2.state ,
t2.start_vehicle_dt ,
t2.predict_arrivals_dt ,
t2.actual_arrivals_dt
from (select * from lg_ods.lg_waybill where dt ='2021-08-26') t1
left join (select * from lg_ods.lg_transport_record where dt ='2021-08-26') t2
on t1.waybill_number =t2.pw_waybill_number ;

3.2 維度表
創建客戶字典寬表
drop table if exists dim.waybill_customer_codes;
create table dim.waybill_customer_codes (
id string,
name string,
tel string,
mobile string,
email string,
type string,
is_own_reg string,
reg_dt string,
reg_channel_id string,
state string,
last_login_dt string,
code_name string,
code_type string,
code string,
code_desc string,
code_cust_type string,
codes_state string
)COMMENT '運單主題-客戶字典寬表'
partitioned by (dt string) STORED AS PARQUET ;
insert overwrite table dim.waybill_customer_codes partition(dt='2021-08-26')
select
t1.id ,
t1.name ,
t1.tel ,
t1.mobile ,
t1.email ,
t1.type ,
t1.is_own_reg ,
t1.reg_dt ,
t1.reg_channel_id ,
t1.state ,
t1.last_login_dt ,
t2.name as code_name ,
t2.type as code_type ,
t2.code ,
t2.code_desc ,
t2.code_type as code_cust_type,
t2.state as codes_state
from (select * from lg_ods.lg_customer where dt ='20210826') t1
left join (select * from lg_ods.lg_codes where dt ='20210826' and type ='1') t2
on t1.type =t2.type ;

創建網點區域寬表
drop table if exists dim.waybill_courier_dot_area;
create table dim.waybill_courier_dot_area (
id string,
job_num string,
name string,
birathday string,
tel string,
pda_num string,
car_id string,
postal_standard_id string,
work_time_id string,
dot_id string,
state string,
dot_number string,
dot_name string,
dot_addr string,
dot_gis_addr string,
dot_tel string,
company_id string,
manage_area_id string,
manage_area_gis string,
dot_state string,
area_name string,
pid string,
sname string,
level string,
citycode string,
yzcode string,
mername string,
lng string,
lat string,
pinyin string) COMMENT '運單主題-快遞員網點區域寬表'
partitioned by (dt string) STORED AS PARQUET ;
insert overwrite table dim.waybill_courier_dot_area partition(dt='2021-08-26')
select
t1.id ,
t1.job_num ,
t1.name ,
t1.birathday ,
t1.tel ,
t1.pda_num ,
t1.car_id ,
t1.postal_standard_id,
t1.work_time_id ,
t1.dot_id ,
t1.state ,
t2.dot_number ,
t2.dot_name ,
t2.dot_addr ,
t2.dot_gis_addr ,
t2.dot_tel ,
t2.company_id ,
t2.manage_area_id ,
t2.manage_area_gis,
t2.state as dot_state ,
t3.name as area_name ,
t3.pid ,
t3.sname ,
t3.level ,
t3.citycode ,
t3.yzcode ,
t3.mername ,
t3.lng ,
t3.lat ,
t3.pinyin
from (select * from lg_ods.lg_courier where dt ='20210826' ) t1
left join (select * from lg_ods.lg_dot where dt ='20210826' ) t2
on t1.dot_id=t2.id
left join (select * from lg_ods.lg_areas where dt ='20210826' ) t3
on t2.manage_area_id =t3.id;

線路表
車輛表
4.4 指標統計
計算指標

--創建lg_dws層表
drop table if exists lg_dws.waybill_base_agg;
create table lg_dws.waybill_base_agg
(waybill_count bigint,
area_name string,
dot_id string,
dot_name string,
route_id string,
tool_id string,
cus_type string)COMMENT '運單主題-lg_dws基礎匯總表'
partitioned by (dt string) STORED AS PARQUET ;
insert overwrite table lg_dws.waybill_base_agg partition(dt='2021-08-26')
select
count(record.waybill_number) as waybill_count,
courier_area.area_name as area_name,
courier_area.dot_id as dot_id,
courier_area.dot_name as dot_name,
route.id as route_id,
tran_tool.id as tool_id,
cus_codes.type as cus_type
from
(select * from lg_dwd.waybill_way_tran_record where dt ='2021-08-26') record
left join
(select * from dim.waybill_customer_codes where dt='2021-08-26') cus_codes
on record.cid= cus_codes.id
left join
(select * from dim.waybill_courier_dot_area where dt='2021-08-26') courier_area
on record.eid= courier_area.id
left join
(select * from lg_ods.lg_route where dt ='2021-08-26') route
on record.route_id =route.id
left join
(select * from lg_ods.lg_transport_tool where dt ='2021-08-26') tran_tool
on record.transport_tool_id =tran_tool.id
group by
courier_area.area_name,courier_area.dot_id,courier_area.dot_name,route.id,tran_tool.id,cus_codes.type;

指標統計
--運單數
--各區域最大運單數
select sum(t.waybill_count) as area_max_count, t.area_name from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.area_name order by
area_max_count desc limit 1;
--各網點最大運單數
select sum(t.waybill_count) as dot_max_count, t.dot_name, t.dot_id from (select
* from lg_dws.waybill_base_agg where dt ='2021-08-26') t group by
t.dot_id,t.dot_name order by dot_max_count desc limit 1;
--各線路最大運單數
select sum(t.waybill_count) as route_max_count, t.route_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.route_id order by
route_max_count desc limit 1;
--各運輸工具最大運單數
select sum(t.waybill_count) as tool_max_count, t.tool_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.tool_id order by
tool_max_count desc limit 1;
--各類客戶最大運單數
select sum(t.waybill_count) as cus_max_count, t.cus_type from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.cus_type order by
cus_max_count desc limit 1;
--各區域最小運單數
select sum(t.waybill_count) as area_min_count, t.area_name from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.area_name order by
area_min_count asc limit 1;
--各網點最小運單數
select sum(t.waybill_count) as dot_min_count, t.dot_name, t.dot_id from (select
* from lg_dws.waybill_base_agg where dt ='2021-08-26') t group by
t.dot_id,t.dot_name order by dot_min_count asc limit 1;
--各線路最小運單數
select sum(t.waybill_count) as route_min_count, t.route_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.route_id order by
route_min_count asc limit 1;
--各運輸工具最小運單數
select sum(t.waybill_count) as tool_min_count, t.tool_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.tool_id order by
tool_min_count desc limit 1;
--各類客戶最小運單數
select sum(t.waybill_count) as cus_min_count, t.cus_type from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.cus_type order by
cus_min_count asc limit 1;
--創建lg_ads層表
drop table if exists lg_ads.waybill_metrics;
create table lg_ads.waybill_metrics(
area_max_count bigint,
area_max_name string,
dot_max_count bigint,
dot_max_name string,
route_max_count bigint,
route_max_id string,
tool_max_count bigint,
tool_max_id string,
cus_max_count bigint,
cus_max_type string,
area_min_count bigint,
area_min_name string,
dot_min_count bigint,
dot_min_name string,
route_min_count bigint,
route_min_id string,
tool_min_count bigint,
tool_min_id string,
cus_min_count bigint,
cus_min_type string ,
dt string
)COMMENT '運主題-指標表' row format delimited fields terminated by ',' STORED AS textfile ;
--匯總sql
insert overwrite table lg_ads.waybill_metrics
select
t1.area_max_count,
t1.area_name as area_max_name,
t2.dot_max_count,
t2.dot_name as dot_max_name,
t3.route_max_count,
t3.route_id as route_max_id,
t4.tool_max_count,
t4.tool_id as tool_max_id,
t5.cus_max_count,
t5.cus_type as cus_max_type,
t6.area_min_count,
t6.area_name as area_min_name,
t7.dot_min_count,
t7.dot_name as dot_min_name,
t8.route_min_count,
t8.route_id as route_min_id,
t9.tool_min_count,
t9.tool_id as tool_min_id,
t10.cus_min_count,
t10.cus_type as cus_min_type,
'2021-08-26'
from
(select sum(t.waybill_count) as area_max_count, t.area_name from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.area_name order by
area_max_count desc limit 1) t1
join
(select sum(t.waybill_count) as dot_max_count, t.dot_name, t.dot_id from (select
* from lg_dws.waybill_base_agg where dt ='2021-08-26') t group by
t.dot_id,t.dot_name order by dot_max_count desc limit 1) t2
join
(select sum(t.waybill_count) as route_max_count, t.route_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.route_id order by
route_max_count desc limit 1) t3
join
(select sum(t.waybill_count) as tool_max_count, t.tool_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.tool_id order by
tool_max_count desc limit 1) t4
join
(select sum(t.waybill_count) as cus_max_count, t.cus_type from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.cus_type order by
cus_max_count desc limit 1) t5
join
(select sum(t.waybill_count) as area_min_count, t.area_name from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.area_name order by
area_min_count asc limit 1) t6
join
(select sum(t.waybill_count) as dot_min_count, t.dot_name, t.dot_id from (select
* from lg_dws.waybill_base_agg where dt ='2021-08-26') t group by
t.dot_id,t.dot_name order by dot_min_count asc limit 1) t7
join
(select sum(t.waybill_count) as route_min_count, t.route_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.route_id order by
route_min_count asc limit 1) t8
join
(select sum(t.waybill_count) as tool_min_count, t.tool_id from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.tool_id order by
tool_min_count desc limit 1) t9
join
(select sum(t.waybill_count) as cus_min_count, t.cus_type from (select * from
lg_dws.waybill_base_agg where dt ='2021-08-26') t group by t.cus_type order by
cus_min_count asc limit 1) t10;

匯出指標資料
創建Mysql指標表
DROP TABLE if EXISTS lg_logstic.`waybill_agg_metrics`;
CREATE TABLE lg_logstic.`waybill_agg_metrics` (
`area_max_count` bigint(20) DEFAULT NULL,
`area_max_name` varchar(50) DEFAULT NULL,
`dot_max_count` bigint(20) DEFAULT NULL,
`dot_max_name` varchar(50) DEFAULT NULL,
`route_max_count` bigint(20) DEFAULT NULL,
`route_max_id` varchar(25) DEFAULT NULL,
`tool_max_count` bigint(20) DEFAULT NULL,
`tool_max_id` varchar(25) DEFAULT NULL,
`cus_max_count` bigint(20) DEFAULT NULL,
`cus_max_type` varchar(25) DEFAULT NULL,
`area_min_count` bigint(20) DEFAULT NULL,
`area_min_name` varchar(50) DEFAULT NULL,
`dot_min_count` bigint(20) DEFAULT NULL,
`dot_min_name` varchar(50) DEFAULT NULL,
`route_min_count` bigint(20) DEFAULT NULL,
`route_min_id` varchar(25) DEFAULT NULL,
`tool_min_count` bigint(20) DEFAULT NULL,
`tool_min_id` varchar(25) DEFAULT NULL,
`cus_min_count` bigint(20) DEFAULT NULL,
`cus_min_type` varchar(25) DEFAULT NULL,
`dt` varchar(25) NOT NULL,
PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
使用Sqoop 工具匯出
[root@linux123 all_import]# sh export_lg_waybill_metrics.sh
#!/bin/bash
source /etc/profile
#定義sqoop命令位置,Hive命令位置,在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7/bin/sqoop
Hive=/opt/lagou/servers/hive-2.3.7/bin/hive
#定義作業日期
#撰寫匯入資料通用方法 接收兩個引數:第一個:表名,第二個:查詢陳述句
export_data(){
$sqoop export \
--connect "jdbc:mysql://linux123:3306/lg_logstic?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 12345678 \
--table $1 \
--export-dir /user/hive/warehouse/lg_ads.db/$2/ \
--num-mappers 1 \
--input-fields-terminated-by ',' \
--update-mode allowinsert \
--update-key dt
}
# 匯出快遞單指標資料
export_lg_waybill_metrics(){
export_data waybill_agg_metrics waybill_metrics
}
#匯出資料方法
export_lg_waybill_metrics

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/300991.html
標籤:其他
