主頁 > 資料庫 > 開源交流丨批流一體資料集成工具ChunJun同步Hive事務表原理詳解及實戰分享

開源交流丨批流一體資料集成工具ChunJun同步Hive事務表原理詳解及實戰分享

2022-07-14 10:41:45 資料庫

原文鏈接:批流一體資料集成工具ChunJun同步Hive事務表原理詳解及實戰分享

課件獲取:關注公眾號__ “數堆疊研習社”,后臺私信 “ChengYing”__ 獲得直播課件

視頻回放:點擊這里

ChengYing 開源專案地址:github 丨 gitee 喜歡我們的專案給我們點個__ STAR!STAR!!STAR!!!(重要的事情說三遍)__

技術交流釘釘 qun:30537511

本期我們帶大家回顧一下無倦同學的直播分享《Chunjun同步Hive事務表詳解》

一、Hive事務表的結構及原理

Hive是基于Hadoop的一個資料倉庫工具,用來進行資料提取、轉化、加載,這是一種可以存盤、查詢和分析存盤在Hadoop中的大規模資料的機制,Hive資料倉庫工具能將結構化的資料檔案映射為一張資料庫表,并提供SQL查詢功能,能將SQL陳述句轉變成MapReduce任務來執行,

在分享Hive事務表的具體內容前,我們先來了解下HIve 事務表在 HDFS 存盤上的一些限制,

Hive雖然支持了具有ACID語意的事務,但是沒有像在MySQL中使用那樣方便,有很多局限性,具體限制如下:

  • 尚不支持BEGIN,COMMIT和ROLLBACK,所有語言操作都是自動提交的

  • 僅支持ORC檔案格式(STORED AS ORC)

  • 默認情況下事務配置為關閉,需要配置引數開啟使用

  • 表必須是分桶表(Bucketed)才可以使用事務功能

  • 表必須內部表,外部表無法創建事務表

  • 表引數transactional必須為true

  • 外部表不能成為ACID表,不允許從非ACID會話讀取/寫入ACID表

以下矩陣包括可以使用Hive創建的表的型別、是否支持ACID屬性、所需的存盤格式以及關鍵的SQL操作,

file

了解完Hive事務表的限制,現在我們具體了解下Hive事務表的內容,

1、事務表檔案名字詳解

  • 基礎目錄:

$partition/base_$wid/$bucket

  • 增量目錄:

$partition/delta_$wid_$wid_$stid/$bucket

  • 引數目錄:

$partition/delete_delta_$wid_$wid_$stid/$bucket

file

2、事務表檔案內容詳解

$ orc-tools data bucket_00000

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","age":18}}

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","age":19}}

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","age":20}}

  • operation 0 表示插入、1 表示更新,2 表示洗掉,由于使用了 split-update,UPDATE 是不會出現的,

  • originalTransaction是該條記錄的原始寫事務 ID:

a、對于 INSERT 操作,該值和 currentTransaction 是一致的;

b、對于 DELETE,則是該條記錄第一次插入時的寫事務 ID,

  • bucket 是一個 32 位整型,由 BucketCodec 編碼,各個二進制位的含義為:

a、1-3 位:編碼版本,當前是 001;

b、4 位:保留;

c、5-16 位:分桶 ID,由 0 開始,分桶 ID 是由 CLUSTERED BY 子句所指定的欄位、以及分桶的數量決定的,該值和 bucket_N 中的 N 一致;

d、17-20 位:保留;

e、21-32 位:陳述句 ID;

舉例來說,整型 536936448 的二進制格式為 00100000000000010000000000000000,即它是按版本 1 的格式編碼的,分桶 ID 為 1,

  • rowId 是一個自增的唯一 ID,在寫事務和分桶的組合中唯一;

  • currentTransaction 當前的寫事務 ID;

  • row 具體資料,對于 DELETE 陳述句,則為 null,

3、更新 Hive 事務表資料

UPDATE employee SET age = 21 WHERE id = 2;

這條陳述句會先查詢出所有符合條件的記錄,獲取它們的 row_id 資訊,然后分別創建 delete 和 delta 目錄:

/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000 (update)

delete_delta_0000002_0000002_0000/bucket_00000

包含了洗掉的記錄:

{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}

delta_0000002_0000002_0000/bucket_00000

包含更新后的資料:

{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":21}}

4、Row_ID 資訊怎么查?

file

5、事務表壓縮(Compact)

隨著寫操作的積累,表中的 delta 和 delete 檔案會越來越多,事務表的讀取程序中需要合并所有檔案,數量一多勢必會影響效率,此外小檔案對 HDFS 這樣的檔案系統也不夠友好,因此Hive 引入了壓縮(Compaction)的概念,分為 Minor 和 Major 兩類,

● Minor

Minor Compaction 會將所有的 delta 檔案壓縮為一個檔案,delete 也壓縮為一個,壓縮后的結果檔案名中會包含寫事務 ID 范圍,同時省略掉陳述句 ID,

壓縮程序是在 Hive Metastore 中運行的,會根據一定閾值自動觸發,我們也可以使用如下陳述句人工觸發:

ALTER TABLE dtstack COMPACT 'MINOR',

● Major

Major Compaction 會將所有的 delta 檔案,delete 檔案壓縮到一個 base 檔案,壓縮后的結果檔案名中會包含所有寫事務 ID 的最大事務 ID,

壓縮程序是在 Hive Metastore 中運行的,會根據一定閾值自動觸發,我們也可以使用如下陳述句人工觸發:

ALTER TABLE dtstack COMPACT 'MAJOR',

6、檔案內容詳解

ALTER TABLE employee COMPACT 'minor';

陳述句執行前:

/user/hive/warehouse/employee/delta_0000001_0000001_0000

/user/hive/warehouse/employee/delta_0000002_0000002_0000 (insert 創建, mary的資料)

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0001 (update)

陳述句執行后:

/user/hive/warehouse/employee/delete_delta_0000001_0000002

/user/hive/warehouse/employee/delta_0000001_0000002

7、讀 Hive 事務表

我們可以看到 ACID 事務表中會包含三類檔案,分別是 base、delta、以及 delete,檔案中的每一行資料都會以 row_id 作為標識并排序,從 ACID 事務表中讀取資料就是對這些檔案進行合并,從而得到最新事務的結果,這一程序是在 OrcInputFormat 和 OrcRawRecordMerger 類中實作的,本質上是一個合并排序的演算法,

以下列檔案為例,產生這些檔案的操作為:

  1. 插入三條記錄

  2. 進行一次 Major Compaction

  3. 然后更新兩條記錄,

1-0-0-1 是對 originalTransaction - bucketId - rowId - currentTra

file

8、合并演算法

對所有資料行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:

originalTransaction-bucketId-rowId-currentTransaction

(base_1)1-0-0-1

(delete_2)1-0-1-2# 被跳過(DELETE)

(base_1)1-0-1-1 # 被跳過(當前記錄的 row_id(1) 和上條資料一樣)

(delete_2)1-0-2-2 # 被跳過(DELETE)

(base_1)1-0-2-1 # 被跳過(當前記錄的 row_id(2) 和上條資料一樣)

(delta_2)2-0-0-2

(delta_2)2-0-1-2

獲取第一條記錄;

  1. 如果當前記錄的 row_id 和上條資料一樣,則跳過;

  2. 如果當前記錄的操作型別為 DELETE,也跳過;

通過以上兩條規則,對于 1-0-1-2 和 1-0-1-1,這條記錄會被跳過;

如果沒有跳過,記錄將被輸出給下游;

重復以上程序,

合并程序是流式的,即 Hive 會將所有檔案打開,預讀第一條記錄,并將 row_id 資訊存入到 ReaderKey 型別中,

file

三、ChunJun讀寫Hive事務表實戰

了解完Hive事務表的基本原理后,我們來為大家分享如何在ChunJun中讀寫Hive事務表,

1、事務表資料準備

-- 創建事務表

create table dtstack(

id int,

name string,

age int

)

stored as orc

TBLPROPERTIES('transactional'='true');

-- 插入 10 條測驗資料

insert into dtstack (id, name, age)

values (1, "aa", 11), (2, "bb", 12), (3, "cc", 13), (4, "dd", 14), (5, "ee", 15),

   (6, "ff", 16), (7, "gg", 17), (8, "hh", 18), (9, "ii", 19), (10, "jj", 20);

2、配置 ChunJun json 腳本

file

file

file

3、提交任務(讀寫事務表)

啟動 Session

/root/wujuan/flink-1.12.7/bin/yarn-session.sh -t $ChunJun_HOME -d

提交 Yarn Session 任務

讀取事務表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job /root/wujuan/ChunJun/ChunJun-examples/json/hive3/hive3_transaction_stream.json -confProp {"yarn.application.id":"application_1650792512832_0134"}

寫入事務表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job /root/wujuan/ChunJun/ChunJun-examples/json/hive3/stream_hive3_transaction.json -confProp {"yarn.application.id":"application_1650792512832_0134"}

根據上一行結果替換 yarn.application.id

三、ChunJun 讀寫Hive事務表原始碼分析

壓縮器是在 Metastore 境內運行的一組后臺程式,用于支持 ACID 系統,它由 Initiator、 Worker、 Cleaner、 AcidHouseKeeperService 和其他一些組成,

1、Compactor

● Delta File Compaction

在不斷的對表修改中,會創建越來越多的delta檔案,需要這些檔案需要被壓縮以保證性能,有兩種型別的壓縮,即(minor)小壓縮和(major)大壓縮:

minor 需要一組現有的delta檔案,并將它們重寫為每個桶的一個delta檔案

major 需要一個或多個delta檔案和桶的基礎檔案,并將它們改寫成每個桶的新基礎檔案,major 需要更久,但是效果更好

所有的壓縮作業都是在后臺進行的,并不妨礙對資料的并發讀寫,在壓縮之后系統會等待,直到所有舊檔案的讀都結束,然后洗掉舊檔案,

●Initiator

這個模塊負責發現哪些表或磁區要進行壓縮,這應該在元存盤中使用hive.compactor.initiator.on來啟用, 每個 Compact 任務處理一個磁區(如果表是未磁區的,則處理整個表),如果某個磁區的連續壓實失敗次數超過 hive.compactor.initiator.failed.compacts.threshold,這個磁區的自動壓縮調度將停止,

● Worker

每個Worker處理一個壓縮任務, 一個壓縮是一個MapReduce作業,其名稱為以下形式,-compactor-.

., 每個Worker將作業提交給集群(如果定義了hive.compactor.job.queue),并等待作業完成,hive.compactor.worker.threads決定了每個Metastore中Worker的數量, Hive倉庫中的Worker總數決定了并發壓縮的最大數量,

● Cleaner

這個行程是在壓縮后,在確定不再需要delta檔案后,將其洗掉,

● AcidHouseKeeperService

這個行程尋找那些在hive.txn.timeout時間內沒有心跳的事務并中止它們,系統假定發起交易的客戶端停止心跳后崩潰了,它鎖定的資源應該被釋放,

● SHOW COMPACTIONS

該命令顯示當前運行的壓實和最近的壓實歷史(可配置保留期)的資訊,這個歷史顯示從HIVE-12353開始可用,

● Compact 重點配置

file

2、如何 debug Hive

  1. debug hive client

hive --debug

  1. debug hive metastore

hive --service metastore --debug:port=8881,mainSuspend=y,childSuspend=n --hiveconf hive.root.logger=DEBUG,console

file

  1. debug hive mr 任務

file

3、讀寫過濾和CompactorMR排序的關鍵代碼

file

file

4、Minor&Major 合并原始碼(CompactorMR Map 類)

file

四、ChunJun 檔案系統未來規劃

最后為大家介紹ChunJun 檔案系統未來規劃:

● 基于 FLIP-27 優化檔案系統

批流統一實作,簡單的執行緒模型,分片和讀資料分離,

● Hive 的分片優化

分片更精細化,粒度更細,充分發揮并發能力

● 完善 Exactly Once 語意

加強例外情況健壯性,

● HDFS 檔案系統的斷點續傳

根據磁區,檔案個數,檔案行數等確定端點位置,狀態存盤在 checkpoint 里面,

● 實時采集檔案

實時監控目錄下的多個追加檔案,

● 檔案系統格式的通用性

JSON、CSV、Text、XM、EXCELL 統一抽取公共包,

袋鼠云開源框架釘釘技術交流qun(30537511),歡迎對大資料開源專案有興趣的同學加入交流最新技術資訊,開源專案庫地址:https://github.com/DTStack

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/499190.html

標籤:其他

上一篇:dolphinscheduler單機化改造

下一篇:02-MySQL高級

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more