分析完成了聚合以及向量化過濾,向量化的函式計算之后,本篇,筆者將分析資料庫的一個重要算子:排序,讓我們從原始碼的角度來剖析ClickHouse作為列式存盤系統是如何實作排序的,
本系列文章的原始碼分析基于ClickHouse v19.16.2.2的版本,
1.執行計劃
老規矩,咱們還是先從一個簡單的查詢出發,通過一步步的通過執行計劃按圖索驥ClickHouse的執行邏輯,
select * from test order by k1;
咱們先嘗試打開ClickHouse的Debug日志看一下具體的執行的pipeline,

這里分為了5個流,而咱們所需要關注的流已經呼之欲出了MergeSorting與PartialSorting,ClickHouse先從存盤引擎的資料讀取資料,并且執行函式運算,并對資料先進行部分的排序,然后對于已經有序的資料在進行MergeSort,得出最終有序的資料,
2. 實作流程的梳理
那咱們接下來要梳理的代碼也很明確了,就是PartialSortingBlockInputStream與MergingSortedBlockInputStream,
- PartialSortingBlockInputStream的實作
PartialSortingBlockInputStream的實作很簡單,咱們直接看代碼吧:
Block PartialSortingBlockInputStream::readImpl()
{
Block res = children.back()->read();
sortBlock(res, description, limit);
return res;
}
它從底層的流讀取資料Block,Block可以理解為Doris之中的Batch,相當一批行的資料,然后根據自身的成員變數SortDescription來對單個Block進行排序,并根據limit進行長度截斷,
SortDescription是一個vector,每個成員描述了單個排序列的排序規則,比如
: null值的排序規則,是否進行逆序排序等,
/// Description of the sorting rule for several columns.
using SortDescription = std::vector<SortColumnDescription>;
- sortBlock的函式實作
接下來,我們來看看sortBlock函式的實作,看看列式的執行系統是如何利用上述資訊進行資料排序的,
void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
{
/// If only one column to sort by
if (description.size() == 1)
{
bool reverse = description[0].direction == -1;
const IColumn * column = !description[0].column_name.empty()
? block.getByName(description[0].column_name).column.get()
: block.safeGetByPosition(description[0].column_number).column.get();
IColumn::Permutation perm;
if (needCollation(column, description[0]))
{
const ColumnString & column_string = typeid_cast<const ColumnString &>(*column);
column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
}
else
column->getPermutation(reverse, limit, description[0].nulls_direction, perm);
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
}
這里需要分為兩種情況討論:1. 單列排序,2.多列排序,多列排序與單列的實作大同小異,所以我們先從單列排序的代碼開始庖丁解牛,它的核心代碼就是下面的這四行:
column->getPermutation(reverse, limit, description[0].nulls_direction, perm);
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
先通過單列排序,拿到每一列在排序之后的IColumn::Permutation perm;,然后Block之中的每一列都利用這個perm, 生成一個新的排序列,替換舊的列之后,就完成Block的排序了,

如上圖所示,Permutation是一個長度為limit的PodArray, 它標識了根據排序列排序之后的排序位置,后續就按照這個perm規則利用函式permute生成新的列,就是排序已經完成的列了,
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t limit) const
{
typename Self::Container & res_data = https://www.cnblogs.com/happenlee/p/res->getData();
for (size_t i = 0; i < limit; ++i)
res_data[i] = data[perm[i]];
return res;
}
這里細心的朋友會發現,String列在sortBlock函式之中做了一些額外的判斷
if (needCollation(column, description[0])) {
const ColumnString & column_string = typeid_cast<const ColumnString &>(*column);
column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
}
這部分是一個特殊的字串生成perm的邏輯,ClickHouse支持用不同的編碼進行字串列的排序,比如通過GBK編碼進行排序的話,那么中文的排序順序將是基于拼音順序的,
- getPermutation的實作
所以,在ClickHouse的排序程序之中,getPermutation是整個排序算子實作的重中之重, 它是Column類的一個虛函式,也就是說每一個不同的資料型別的列都可以實作自己的排序邏輯,我們通過ColumnVector的實作,來管中規豹一把,
template <typename T>
void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
{
if (reverse)
std::partial_sort(res.begin(), res.begin() + limit, res.end(), greater(*this, nan_direction_hint));
else
std::partial_sort(res.begin(), res.begin() + limit, res.end(), less(*this, nan_direction_hint));
}
else
{
/// A case for radix sort
if constexpr (std::is_arithmetic_v<T> && !std::is_same_v<T, UInt128>)
{
return;
}
}
/// Default sorting algorithm.
for (size_t i = 0; i < s; ++i)
res[i] = i;
pdqsort(res.begin(), res.end(), less(*this, nan_direction_hint));
}
}
這部分代碼較多,筆者簡化了一下這部分的邏輯,
- 如果存在
limit條件,并且列的長度大于limit,采用std::partial_sort進行perm的排序, - 如果為數字型別,并且不為
UInt128型別時,則采用Radix Sort計數排序來對perm進行排序, - 如不滿足前二者的條件,則使用快速排序作為最終的默認實作,
好的,看到這里,已經完整的梳理了PartialSortingBlockInputStream,得到了每一個輸出的Block已經按照我們的排序規則進行排序了,接下來就要請出MergeSortingBlockInputStream來進行最終的排序作業,
- MergeSortingBlockInputStream的實作
從名字上也能看出來,這里需要完成一次歸并排序,來得到最終有序的排序結果,至于排序的物件,自然上面通過PartialSortingBlockInputStream輸出的Block了,
直接定位到readImpl()的實作,ClickHouse這里實作了Spill to disk的外部排序邏輯,這里為了簡化,筆者先暫時拿掉這部分外部排序的邏輯,
Block MergeSortingBlockInputStream::readImpl()
{
/** Algorithm:
* - read to memory blocks from source stream;
*/
/// If has not read source blocks.
if (!impl)
{
while (Block block = children.back()->read())
{
blocks.push_back(block);
sum_rows_in_blocks += block.rows();
sum_bytes_in_blocks += block.allocatedBytes();
/** If significant amount of data was accumulated, perform preliminary merging step.
*/
if (blocks.size() > 1
&& limit
&& limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
&& remerge_is_useful
&& max_bytes_before_remerge
&& sum_bytes_in_blocks > max_bytes_before_remerge)
{
remerge();
}
if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled())
return Block();
if (temporary_files.empty())
{
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit);
}
Block res = impl->read();
return res;
}
由上面代碼可以看到,MergeSortingBlockInputStream這部分就是不斷從底層的PartialSortingBlockInputStream讀取出來,并存盤全部存盤下來,最終讀取完成之后,利用MergeSortingBlocksBlockInputStream類,完成所有Blocks的歸并排序作業,而MergeSortingBlocksBlockInputStream類就是簡單完成利用堆進行多路歸并排序的程序代碼,筆者在這里就不再展開了,感興趣的同學可以自行參考MergeSortingBlockInputStream.cpp部分的實作,
3.要點梳理
第二小節梳理完ClickHouse的排序算子的實作流程,這里進行一些簡單的要點小結:
-
ClickHouse的排序實作需要利用排序列生成對應的
perm,最終利用perm完成每一個Block的排序, -
所以每一個不同資料型別的列,都需要實作
getPermutation與permute來實作排序,并且可以根據資料型別,選擇不同的排序實作,比如radix sort的時間復雜度為O(n),相對快速排序的時間復雜度就存在了明顯的優勢, -
排序演算法存在大量的資料依賴,所以是很難發揮
SIMD的優勢的,只有在radix sort下才些微有些部分可以向量化,所以相對于非向量化的實作,不存在太多性能上的優勢,
4. 小結
OK,到此為止,咱們可以從Clickhouse的原始碼實作之中梳理完成列式的存盤系統是如何實作排序的,
當然,這部分跳過了一部分重要的實作:Spill to disk,這個是確保在一定的記憶體限制之下,對海量資料進行排序時,可以利用磁盤來快取排序的中間結果,這部分的實作也很有意思,感興趣的朋友,可以進一步展開來看這部分的實作,
筆者是一個ClickHouse的初學者,對ClickHouse有興趣的同學,歡迎多多指教,交流,
5. 參考資料
官方檔案
ClickHouse源代碼
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/288808.html
標籤:大數據
上一篇:萊姆達表達試——查詢篇
