由于作業的需求,后續筆者作業需要和開源的OLAP資料庫ClickHouse打交道,ClickHouse是Yandex在2016年6月15日開源了一個分析型資料庫,以強悍的單機處理能力被稱道,
筆者在實際測驗ClickHouse和閱讀ClickHouse的原始碼程序之中,對"戰斗民族"開發的資料庫十分欣賞,ClickHouse不僅是一個很好的資料庫學習材料,而且同時應用了大量的CPP17的新特性進行開發,也是一個大型的Modern CPP的教導資料,
筆者接下來會陸續將閱讀ClickHouse的部分心得體會與通過原始碼閱讀筆記的方式和大家分享,坦白說,這種原始碼閱讀筆記很難寫啊,(多一分繁瑣,少一分就模糊了~~)
第一篇文章,我們就從聚合函式的實作開始聊起~~ 上車!
1.基礎知識的梳理
什么是聚合函式?
聚合函式: 顧名思義就是對一組資料執行聚合計算并回傳結果的函式,
這類函式在資料庫之中很常見,如:count, max, min, sum等等,
ClickHouse的實作介面
- IAggregateFunction介面
在ClickHouse之中,定義了一個統一的聚合函式介面:IAggregateFunction.(在ClickHouse之中,所有的介面類都是以大寫的I開頭的,) 上文筆者提到的聚合函式,則都是作為抽象類IAggregateFunction的子類實作的,其中該介面最為核心的方法是下面這5個方法:- add函式:最為核心的呼叫介面,將對應AggregateDataPtr指標之中資料取出,與列columns中的第row_num的資料進行對應的聚合計算,(這里可以看到ClickHouse是一個純粹的列式存盤資料庫,所有的操作都是基于列的資料結構,)
- merge函式:將兩個聚合結果進行合并的函式,通常用在并發執行聚合函式的程序之中,需要將對應的聚合結果進行合并,
- serialize函式與deserialize函式:序列化與反序列化的函式,通常用于spill to disk或分布式場景需要保存或傳輸中間結果的,
- addBatch函式:這是函式也是非常重要的,雖然它僅僅實作了一個for回圈呼叫add函式,它通過這樣的方式來減少虛函式的呼叫次數,并且增加了編譯器行內的概率,(虛函式的呼叫需要一次訪存指令,一次查表,最終才能定位到需要呼叫的函式上,這在傳統的火山模型的實作上會帶來極大的CPU開銷,)
/** Adds a value into aggregation data on which place points to.
* columns points to columns containing arguments of aggregation function.
* row_num is number of row which should be added.
* Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
*/
virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;
/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;
/// Deserializes state. This function is called only for empty (just created) states.
virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
// /** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
- 抽象類IColumn
上面的介面IAggregateFunction的函式使用到了ClickHouse的核心介面IColumn類,這里也進行簡要的介紹, IColumn 介面表達了所有資料在ClickHouse之中的用記憶體表達的資料結構,其他帶有具體資料型別的如ColumnUInt8、ColumnArray等, 都實作了對應的列介面,并且在子類之中具象實作了不同的記憶體布局,
IColumn的子類實作細節很瑣碎,筆者這里就暫時不展開講了,筆者這里就簡單講講涉及到聚合函式呼叫部分的IColumn介面的對應方法:
這里columns是一個二維陣列,通過columns[0]可以取到第一列,(這里只有涉及到一列,為什么columns是二維陣列呢?因為處理array等列的時候,也是通過對應的介面,而array就需要應用二維陣列了. )
注意這里有一個強制的型別轉換,column已經轉換為ColVecType型別了,這是模板派生出IColumn的子類,
然后通過IColumn子類實作的getData方法獲取對應row_num行的資料進行add函式呼叫就完成了一次聚合函式的計算了,
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).add(column.getData()[row_num]);
}
- IAggregateFunctionHelper介面
這個介面是上面提到 IAggregateFunction的輔助子類介面,它很巧妙的通過模板的型別派生,將虛函式的呼叫轉換為函式指標的呼叫,這個在實際聚合函式的實作程序之中能夠大大提高計算的效率,
函式addFree就實作了我上述所說的程序,但是它是一個private的函式,所以通常我們都是通過getAddressOfAddFunction獲取對應的函式地址,這在聚合查詢的程序之中能夠提高20%左右的執行效率,
template <typename Derived>
class IAggregateFunctionHelper : public IAggregateFunction
{
private:
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
{
static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
}
public:
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
AddFunc getAddressOfAddFunction() const override { return &addFree; }
- AggregateFunctionFactory類
顧名思義,這個是一個生成聚合函式的工廠類,它的邏輯很簡單,所有ClickHouse之中所相關的聚合函式都是通過這個工廠類注冊并且獲取,然后進行呼叫的,
class AggregateFunctionFactory final : private boost::noncopyable, public IFactoryWithAliases<AggregateFunctionCreator>
{
public:
static AggregateFunctionFactory & instance();
/// Register a function by its name.
/// No locking, you must register all functions before usage of get.
void registerFunction(
const String & name,
Creator creator,
CaseSensitiveness case_sensitiveness = CaseSensitive);
/// Throws an exception if not found.
AggregateFunctionPtr get(
const String & name,
const DataTypes & argument_types,
const Array & parameters = {},
int recursion_level = 0) const;
2.聚合函式的注冊流程
有了上述的背景知識,我們接下來舉個栗子,來看看一個聚合函式的實作細節,以及它是如何被使用的,
AggregateFunctionSum
筆者這里選取了一個很簡單的聚合算子Sum,我們來看看它實作的代碼細節,
這里我們可以看到AggregateFunctionSum是個final類,無法被繼承了,而它繼承了上面提到的IAggregateFunctionHelp類的子類IAggregateFunctionDataHelper類,
這里我們就重點看,這個類override了getName方法,回傳了對應的名字sum,并且實作了我們上文提到的四個核心的方法,
- add
- merge
- seriable
- deserialize
template <typename T, typename TResult, typename Data>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>
{
public:
using ResultDataType = std::conditional_t<IsDecimalNumber<T>, DataTypeDecimal<TResult>, DataTypeNumber<TResult>>;
using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<TResult>, ColumnVector<TResult>>;
String getName() const override { return "sum"; }
AggregateFunctionSum(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
, scale(0)
{}
AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
, scale(getDecimalScale(data_type))
{}
DataTypePtr getReturnType() const override
{
if constexpr (IsDecimalNumber<T>)
return std::make_shared<ResultDataType>(ResultDataType::maxPrecision(), scale);
else
return std::make_shared<ResultDataType>();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).add(column.getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & column = static_cast<ColVecResult &>(to);
column.getData().push_back(this->data(place).get());
}
private:
UInt32 scale;
};
接下來,ClickHouse實作了兩種聚合計算:AggregateFunctionSumData和AggregateFunctionSumKahanData,后者是用Kahan演算法避免float型別精度損失的,我們可以暫時不細看,直接看SumData的實作,這是個模板類,之前我們講到AggregateFunction的函式就是通過AggregateDataPtr指標來獲取AggregateFunctionSumData的地址,來呼叫add實作聚合算子的,我們可以看到AggregateFunctionSumData實作了前文提到的add, merge, write,read四大方法,正好和介面一一對應上了,
template <typename T>
struct AggregateFunctionSumData
{
T sum{};
void add(T value)
{
sum += value;
}
void merge(const AggregateFunctionSumData & rhs)
{
sum += rhs.sum;
}
void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
}
void read(ReadBuffer & buf)
{
readBinary(sum, buf);
}
T get() const
{
return sum;
}
};
ClickHouse在Server啟動時,main函式之中會呼叫registerAggregateFunction的初始化函式注冊所有的聚合函式,
然后呼叫到下面的函式:
void registerAggregateFunctionSum(AggregateFunctionFactory & factory)
{
factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("sumWithOverflow", createAggregateFunctionSum<AggregateFunctionSumWithOverflow>);
factory.registerFunction("sumKahan", createAggregateFunctionSum<AggregateFunctionSumKahan>);
}
這里又呼叫了 factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);來進行上述我們看到的聚合函式的注冊,這里有一點很惡心的模板代碼,筆者這里簡化了一下,把注冊的部分函式拉出來:
createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
AggregateFunctionPtr res;
DataTypePtr data_type = argument_types[0];
if (isDecimal(data_type))
res.reset(createWithDecimalType<Function>(*data_type, *data_type, argument_types));
else
res.reset(createWithNumericType<Function>(*data_type, argument_types));
return res;
這里的Function模板就是上面的AggregateFunctionSumSimple, 而它又是下面的模板型別:
template <typename T> using AggregateFunctionSumSimple = typename SumSimple<T>::Function;
template <typename T>
struct SumSimple
{
/// @note It uses slow Decimal128 (cause we need such a variant). sumWithOverflow is faster for Decimal32/64
using ResultType = std::conditional_t<IsDecimalNumber<T>, Decimal128, NearestFieldType<T>>;
using AggregateDataType = AggregateFunctionSumData<ResultType>;
using Function = AggregateFunctionSum<T, ResultType, AggregateDataType>;
};
不知道讀者被繞暈了沒,最終繞回來還是new出來這個AggregateFunctionSum<T, ResultType, AggregateDataType>
也就是完成了這個求和算子的注冊,后續我們get出來就可以愉快的呼叫啦,(這里這部分的模板變化比較復雜,如果看不明白可以回到原始碼梳理一下~~~)
3. 小結
好了,關于聚合函式的基礎資訊,和它是如何實作并且通過工廠方法注冊獲取的流程算是搞明白了,
關于其他的聚合算子,也是大同小異的方式,筆者就不再贅述了,感興趣的可以回到原始碼之中繼續一探究竟,講完了聚合函式的實作,下一篇筆者就要繼續給探究聚合函式究竟在ClickHouse之中是如何和列存結合使用,并實作向量化的~~,
筆者是一個ClickHouse的初學者,對ClickHouse有興趣的同學,也歡迎和筆者多多指教,交流,
4. 參考資料
官方檔案
ClickHouse源代碼
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/4565.html
標籤:大數據
