文章目錄
- 簡介
- ClickHouse簡介
- RoaringBitmap(RBM)原理
- ClickHouse中使用RBM存在的問題
- RoaringBitmap(RBM)定制序列化實作
- ClickHouse中RoaringBitmap的結構決議
- Spark中RoaringBitmap的實作
- 定制RBM序列化方式以兼容ClickHouse
- Byte(1) - 型別標識生成
- VarInt(SerializedSizeInBytes) - 序列化后的位元組長度
- ByteArray(RoaringBitmap) - RBM序列化
- 定制序列化的整體實作
- Spark和ClickHouse生成的RBM序列化資料比對
- ClickHouse中生成的RBM資料:
- Spark中定制生成的RBM資料:
- Spark生成RBM匯入ClickHouse實作
- 創建ClickHouse表
- 使用Spark JDBC方式匯入
- 查詢CK表中資料驗證
簡介
ClickHouse簡介
ClickHouse是由號稱“俄羅斯Google”的Yandex公司開發并在2016年開源,ClickHouse是一個列存盤資料庫,是原生的向量化執行引擎,目前ClickHouse在OLAP領域得到了廣泛的使用,其首要原因是查詢速度快,
在大資料處理中,海量資料的判重和基數統計是兩個繞不開的基礎問題,ClickHouse的解決方案是使用RoaringBitmap,其已有豐富的bitmap操作函式支持,可以實作非常靈活方便的判重和基數統計操作,
RoaringBitmap(RBM)原理
Bitmap用位圖的方式來存盤id數值資訊,可以實作精確地基數統計,但因為Bitmap在數值稀疏時會造成很大空間浪費,因此提出了用RoaringBitmap(RBM)對稀疏位圖進行壓縮,減少記憶體占用并提高效率,RBM的主要思路是:將32位無符號整數按照高16位分桶,即最多可能有2^16=65536個桶,又稱為container,存盤資料時,按照資料的高16位找到container(找不到就會新建一個),再將低16位放入container中,也就是說,一個RBM就是很多container的集合,其詳細的原理可參考文章,
ClickHouse中使用RBM存在的問題
在查看了ClickHouse的檔案及搜索了各公司的實踐方案(如騰訊和頭條)后,發現目前只能將原始明細的id資料匯入到ClickHouse后,再通過創建物化視圖的方式構建RBM結構進行使用,但是原始明細資料量往往非常大,這不僅給資料ETL處理造成了很大的負擔,也對計算資源以及ClickHouse的集群資源要求非常高,
我們的資料ETL處理通常是基于hadoop平臺的Spark計算框架進行處理,那能不能在Spark進行資料處理時就將Bitmap資料預計算存盤好,這樣不僅資料量會大大地減少,同時也能大大地減少對ClickHouse集群資源的要求!
RoaringBitmap(RBM)定制序列化實作
ClickHouse中RoaringBitmap的結構決議
為了在Spark資料預處理時提前計算存盤好RBM,首先就需要了解ClickHouse中RBM結構的實作原理,通過在ClickHouse的原始碼社區進行咨詢,了解到ClickHouse是利用CRoaring實作的RBM,并且其存盤結構的格式是Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap),

Spark中RoaringBitmap的實作
目前CRoaring沒有對應的Java語言實作庫,而在Java中RBM的常用實作庫是RoaringBitmap,并且其已經在Spark、Kylin和Druid等系統中得到了應用,RoaringBitmap庫提供了完善的bitmap操作,對其進行封裝后就可以集成到SparkSQL中進行使用,
這是我在Spark中自定義實作的bitmap相關udf函式,因此可以在Hive中定義Binary列來存盤RBM計算的中間結果,這樣既可以使用SparkSQL對中間結果進行再聚合統計,同時也可以將中間結果直接匯入到ClickHouse中進行查詢,極大地減少了匯入ClickHouse的資料量,
定制RBM序列化方式以兼容ClickHouse
查看了ClickHouse中RBM實作原始碼和Java版RoaringBitmap的序列化方式后,發現兩者序列化格式不兼容,為了可以將中間結果資料匯入ClickHouse中,需要在匯入前定制序列化方式使其與ClickHouse中的RBM兼容,前面已經了解到ClickHouse中RBM的存盤結構是Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap),因此可對各結構進行實作組裝,
Byte(1) - 型別標識生成
第一部分是用一個位元組來標識該RBM的型別,原來ClickHouse中對于小集合(基數小于32)的實作進行了性能優化,對其采用SmallSet集合的方式存盤,其大小最大限定寫死為32,也就是說當位圖的基數少于32時,僅使用SmallSet存盤;一旦超過此閾值,就呼叫toLarge()方法轉化為RBM,此后都在RBM上操作,第一個位元組就是用來標識底層存盤是采用SmallSet還是RBM實作的,如果是SmallSet則標識為0,否則標識為1,因此實作如下,
// rb: RoaringBitmap
if (rb.getCardinality <= 32) {
bos.put(new Integer(0).toByte) // bos:ByteBuffer
} else {
bos.put(new Integer(1).toByte)
}
VarInt(SerializedSizeInBytes) - 序列化后的位元組長度
第二部分是用VarInt型別來存盤第三部分ByteArray(RoaringBitmap)的所占用位元組長度,參考代碼可實作如下:
// rb: RoaringBitmap、bos:ByteBuffer
VarInt.putVarInt(rb.serializedSizeInBytes(), bos)
ByteArray(RoaringBitmap) - RBM序列化
第三部分就是RBM序列化后的資料,可直接用例Java中RoaringBitmap的序列化進行實作(經驗證與CRoaring序列化方式是一致的):
// rb: RoaringBitmap、bos:ByteBuffer
rb.serialize(bos)
定制序列化的整體實作
由于ClickHouse的RBM在基數小于32時用SmallSet集合進行了優化,因此在查看了ClickHouse中SmallSet的實作后,定制序列化方法的整體實作如下:
def serialize(rb: RoaringBitmap): ByteBuffer = {
if (rb.getCardinality <= 32) {
val bos1 = ByteBuffer.allocate(2 + 4*rb.getCardinality)
val bos = if (bos1.order eq LITTLE_ENDIAN) bos1 else bos1.slice.order(LITTLE_ENDIAN)
bos.put(new Integer(0).toByte)
bos.put(rb.getCardinality.toByte)
rb.toArray.foreach(i => bos.putInt(i))
bos
} else {
val varIntLen = VarInt.varIntSize(rb.serializedSizeInBytes())
val bos1 = ByteBuffer.allocate(1 + varIntLen + rb.serializedSizeInBytes()) // 1表示標識位,即是否小于32個值;varIntLen表示后面資料的位元組長度;
val bos = if (bos1.order eq LITTLE_ENDIAN) bos1 else bos1.slice.order(LITTLE_ENDIAN)
bos.put(new Integer(1).toByte)
VarInt.putVarInt(rb.serializedSizeInBytes(), bos)
rb.serialize(bos)
bos
}
}
Spark和ClickHouse生成的RBM序列化資料比對
已完成RBM整體的序列化后,為了能和ClickHouse生成的RBM資料進行比對,因此在ClickHouse中使用base64Encode函式將位元組資料轉換成字串資料后與Spark中生成的資料進行比對以驗證我們定制序列化方式的正確性:
ClickHouse中生成的RBM資料:
:) SELECT base64Encode(toString(bitmapBuild([toUInt32(32), toUInt32(65), toUInt32(127), toUInt32(1026)])));
SELECT base64Encode(toString(bitmapBuild([toUInt32(32), toUInt32(65), toUInt32(127), toUInt32(1026)])))
┌─base64Encode(toString(bitmapBuild(array(toUInt32(32), toUInt32(65), toUInt32(127), toUInt32(1026)))))─┐
│ AAQgAAAAQQAAAH8AAAACBAAA │
└───────────────────────────────────────────────────────────────────────────────────────────────────────┘
Spark中定制生成的RBM資料:
val rb = RoaringBitmap.bitmapOf(32, 65, 127, 1026)
// serialize即上一小節定義的定制序列化函式
val encode = new String(Base64.getEncoder.encode(serialize(rb).array()))
println(encode.equals("AAQgAAAAQQAAAH8AAAACBAAA"))
Spark生成RBM匯入ClickHouse實作
現在我們已經在Spark中可以直接生成和ClickHouse中RBM兼容的資料了,那最直接的方法是直接將序列化后的位元組資料插入roaring_bitmap AggregateFunction(groupBitmap, UInt32)欄位中,在測驗后發現插入資料一直無法成功,報如下錯誤:
DB::Exception: Cannot read all data. Bytes read: 292. Bytes expected: 5201903.
在官方檔案中看到AggregateFunction的列只能使用NSERT SELECT結合aggregate -State-函式插入資料,可能是這個原因導致的,
既然無法直接插入AggregateFunction列的資料,通過查看ClickHouse檔案后發現其支持物化運算式的能力,可定義某一列是另一列通過某個運算式計算生成出來的,那我們可以把roaring_bitmap AggregateFunction(groupBitmap, UInt32)定義成物化運算式列,而將RBM序列化資料插入普通字串列中,這樣就可以實作優美的資料打通能力,具體操作步驟如下,
創建ClickHouse表
定義bitmap型別的物化運算式列,其是通過原始RBM序列化資料生成得到的,但如果直接將序列化資料插入String列,會發現寫入后的資料和寫入前不一致(可能是因為有特殊字符的原因),因此想到的一種方式是將RBM序列化資料先用Base64編碼成普通字串插入,然后通過物化運算式解碼成RBM序列化的資料,
// 創建分布式表
CREATE TABLE stone.bitmap_test ON CLUSTER cluster_name (
ds Int64,
user String,
roaring_bitmap AggregateFunction(groupBitmap, UInt32)
MATERIALIZED base64Decode(user)
)
ENGINE = ReplicatedMergeTree('xxx', '{replica}')
PARTITION BY ds
ORDER BY (ds)
SETTINGS index_granularity = 8192;
// 創建view視圖用于查詢分布式表資料
CREATE TABLE stone.bitmap_test_view on cluster cluster_name AS stone.bitmap_test ENGINE=Distributed(cluster_name, stone, bitmap_test, rand());
使用Spark JDBC方式匯入
構造一個簡單的DataFrame資料寫入ClickHouse表(寫入函式saveToClickHouse可自定義實作),注意物化表達化列是不需要插入資料的,它會自動計算生成,
// 構造待寫入資料
val rows = new util.ArrayList[Row]()
rows.add(Row(1234561L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 13: _*)).array()))))
rows.add(Row(1234562L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 133: _*)).array()))))
rows.add(Row(1234563L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 1334: _*)).array()))))
rows.add(Row(1234564L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 133334: _*)).array()))))
val schema = StructType(
List(
StructField("ds", LongType),
StructField("user", StringType)
))
val df = jobManager.spark.createDataFrame(rows, schema)
// 自定義的向ck表寫入資料的方法
saveToClickHouse(df,
"stone",
"bitmap_test",
"cluster_name",
Some(1), // 寫入并發partition數
Some(5000)) // 每次寫入資料的batch size
查詢CK表中資料驗證
查詢ClickHouse中表的資料可以發現資料正確被插入了,并且物化運算式生成的RBM列資料也是正常的,至此,SparkSQL & ClickHouse打通RBM功能的流程就全部完成了!
:) select ds, bitmapCardinality(roaring_bitmap) as uv from stone.bitmap_test_view;
SELECT
ds,
bitmapCardinality(roaring_bitmap) AS uv
FROM stone.bitmap_test_view
┌──────ds─┬───uv─┐
│ 1234563 │ 1333 │
└─────────┴──────┘
┌──────ds─┬──uv─┐
│ 1234562 │ 132 │
└─────────┴─────┘
┌──────ds─┬─uv─┐
│ 1234561 │ 12 │
└─────────┴────┘
┌──────ds─┬─────uv─┐
│ 1234564 │ 133333 │
└─────────┴────────┘
4 rows in set. Elapsed: 0.003 sec.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/233611.html
標籤:其他
