Leveldb是非常經典的C++實作的KV存盤,代碼簡短,關鍵代碼2W行以內,麻雀雖小,五臟俱全,大神之作代碼風格看著舒服,RocksDB便是fork自leveldb做出了優化和性能調優,成為常用的工業級KV資料庫引擎,里面很多組件的設計與實作值得學習,也可以作為研究RockDB引擎的基礎,也為大型資料庫Cassandra、ScyllaDB等LSM架構的資料做技術儲備,
leveldb的寫入流程是先寫入預寫日志(WAL)然后寫入memtable,最后通過只讀memtable刷盤為sstable
知識準備
寫入示例
leveldb的寫入流程
#include <iostream> #include "leveldb/db.h" #include <cassert> using namespace std; namespace ld=leveldb; int main(){ ld::DB *db; ld::Options options; options.create_if_missing=true; ld::Status status=ld::DB::Open(options,"/tmp/testdb",&db); assert(status.ok()); status=db->Put(ld::WriteOptions(),"key1","val1"); assert(status.ok()); return 0; }
呼叫堆疊
// step 1 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); } // step 2 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; ... }
首先將寫入的鍵值寫入到WriteBatch結構,然后呼叫DBImpl::Write做寫入操作,下面會重點說明DBImpl函式
WriteBatch
WriteBatch是DBImpl::Write流程的主要部分,首先需要說明其原理,leveldb為了追求性能,會將write打包為batch然后批量進行wal的寫入,因此leveldb的寫入是原子性的,即使服務宕機,仍然可以使資料恢復,
WirteBatch只有一個私有成員變數 std::string rep_ ,存放資料
WriteBatch的編碼如下:
| 長度 | 8位元組 | 4位元組 | 可變長度 | 可變長度 | 可變長度 |
| 內容 | sequence number | count | record 1 | record 2 | record 3 |
sequence number: leveldb的序列號,由于MVCC,此處選擇最近recod的sequence number
count 為記錄數量
record的編碼如下:
| 長度 | 1位元組 | 可變長度 | 鍵大小 | 可變變長 | 值大小 |
| 內容 | 型別 | 鍵大小 | 鍵 | 值大小 | 值 |
WriteBatch的成員函式主要包括:
// Store the mapping "key->value" in the database. void Put(const Slice& key, const Slice& value); // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); // Clear all updates buffered in this batch. void Clear(); // The size of the database changes caused by this batch. // // This number is tied to implementation details, and may change across // releases. It is intended for LevelDB usage metrics. size_t ApproximateSize() const; // Copies the operations in "source" to this batch. // // This runs in O(source size) time. However, the constant factor is better // than calling Iterate() over the source batch with a Handler that replicates // the operations into this batch. void Append(const WriteBatch& source); // Support for iterating over the contents of a batch. Status Iterate(Handler* handler) const;
WriteBachInternal是WriteBatch的友元類,為其輔助函式
寫入流程
leveldb有函式DBImpl::Write負責寫入,下屬將會介紹此函式
DBImpl::Write函式主要邏輯
分步敘述主邏輯
Part 1
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } }
- Write函式接受一個WriteBatch,以及寫入的引數,sync的意思是wal是否直接刷盤,done是否此updates已經寫入完成
- 寫入支持并發,并會寫入到一個佇列writers_內,通過條件變數來實作生產者消費者,將多個執行緒的寫入合并,來提升寫入的性能,后面會詳細說明其實作方式
- 只有在佇列隊首,而且寫入沒有完成才會執行下述的邏輯,
- 如果執行到 w.one==true則直接退出,此時表明資料被其他執行緒成功寫入了(因為寫入之后,會更新Writer的寫入狀態),
Part 2
Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); // write_batch只需要寫入一個seq WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { // 此處解鎖,其他執行緒獲取鎖之后,執行加入writers_佇列的動作,然后阻塞在條件變數上 // 在執行緒[t1,t2,t3],第一次執行時batch中只會有t1執行緒的內容,隨后t2和t3才會加入 mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (write_batch == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); }
- MakeRoomForWrite主要的作業為:處理L0增長過快,選擇是否寫限速或者寫停止、memtable的刷盤邏輯以及memtable所對應的wal的處理邏輯,(下文還會對此函式詳細論述)
- 獲取此寫入的sequence num,每個寫都會有個遞增的數值
- BuildBatchGroup比較簡單,將佇列的writers_的寫入合并為一個WriteBatch,writes_為stl佇列資料結構,通過迭代器遍歷,然后通過WriteBatchInternal輔助類對WriteBatch操作,得出結果
- 此處釋放鎖的原因是為了提升性能,wal寫入和寫入memtable比較耗時,此處釋放鎖后,其他的執行緒的寫入可以入隊writes_內,但是不會向下執行邏輯
- InsertInto將WriteBatch的內容寫入到memtable,以后會寫一個關于memtable的文章
- AddRecord將WriteBatch寫入wal,以后會寫一個wal的文章
Part 3
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); // 不是隊首元素,則標記該寫入已經完成 if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); // 通知 } if (ready == last_writer) break; } // 通知 if (!writers_.empty()) { writers_.front()->cv.Signal(); }
主要邏輯是寫入完成的出隊,并且更新其寫入狀態
關鍵函式分析
MakeRoomForWrite
Status DBImpl::MakeRoomForWrite(bool force); 函式的作用是:處理L0寫入過快的問題,處理memtable和immemtable以及wal,以及是否進行compaction 引數force標識是否立即刷盤 函式的主要邏輯:- 根據引數force是否立即刷盤,然后決定是否允許延遲操,由變數allow_delay標識
- 如果bg_error_發生錯誤,退出回圈,并回傳error狀態
- 如果allow_delay為ture,而且L0的檔案數大于kL0_SlowdownWritesTrigger(默認值為8),則寫入限速1ms,在sleep之前釋放鎖mutex_,不阻塞其他執行緒邏輯,并將allow_deply設定為false,單次寫入只允許限速一次
- 如果不立即刷盤,而且memtable的近似大小仍未達到write_buffer_size,則直接退出函式,什么也不用做
- 代碼走到此處,要么需要立即刷盤,要么大小超過write_buffer_size,如果此時存在immemtable,則通過條件變數阻塞,只到compaction完成(immemtable刷盤成功)
- 如果L0的檔案數目超過kL0_StopWritesTrigger(默認12),則寫入停止,也是通過條件變數實作,等待compaction將L0的檔案數減少
- 最后的情況,就是創建Log檔案句柄,創建memtable檔案句柄,將就舊的的memtable變為immemtable,然后判斷是否需要通過異步調度compatiion動作
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/473458.html
標籤:其他
下一篇:Git常用操作(Gitlab)
