本次主要是對redis中著名的持久化策略進行代碼層面描述,主要包括RDB持久化和AOF持久化
因為AOF檔案的更新頻率比RDB高,所以如果服務器開啟AOF持久化,redis優先使用AOF檔案還原,只有當AOF持久化關閉,才使用RDB檔案進行還原
RDB持久化
RDB持久化主要有兩個命令實作:SAVE和BGSAVE
SAVE、BGSAVE
SAVE會阻塞redis服務器,知道RDB檔案創建完畢
void saveCommand(redisClient *c) {
// BGSAVE 已經在執行中,不能再執行 SAVE
// 否則將產生競爭條件
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
return;
}
// 執行
if (rdbSave(server.rdb_filename) == REDIS_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
}
}
BGSAVE不會阻塞,他會創建一個子行程,由子行程處理RDB檔案保存
void bgsaveCommand(redisClient *c) {
// 不能重復執行 BGSAVE
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
// 不能在 BGREWRITEAOF 正在運行時執行
} else if (server.aof_child_pid != -1) {
addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
// 執行 BGSAVE
} else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
}
}
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
// 如果 BGSAVE 已經在執行,那么出錯
if (server.rdb_child_pid != -1) return REDIS_ERR;
// 記錄 BGSAVE 執行前的資料庫被修改次數
server.dirty_before_bgsave = server.dirty;
// 最近一次嘗試執行 BGSAVE 的時間
server.lastbgsave_try = time(NULL);
// fork() 開始前的時間,記錄 fork() 回傳耗時用
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* 子行程 */
// 關閉網路連接 fd
closeListeningSockets(0);
// 設定行程的標題,方便識別
redisSetProcTitle("redis-rdb-bgsave");
// 執行保存操作
retval = rdbSave(filename);
// 列印 copy-on-write 時使用的記憶體數
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
// 向父行程發送信號
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
/* 父行程 */
// 計算 fork() 執行的時間
server.stat_fork_time = ustime()-start;
// 如果 fork() 出錯,那么報告錯誤
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
// 列印 BGSAVE 開始的日志
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
// 記錄資料庫開始 BGSAVE 的時間
server.rdb_save_time_start = time(NULL);
// 記錄負責執行 BGSAVE 的子行程 ID
server.rdb_child_pid = childpid;
// 關閉自動 rehash
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
兩個命令內部都是執行rdbSave函式
/*
* 將資料庫保存到磁盤上,
* 保存成功回傳 REDIS_OK ,出錯/失敗回傳 REDIS_ERR ,
*/
int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;
// 創建臨時檔案
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
// 初始化 I/O
rioInitWithFile(&rdb,fp);
// 設定校驗和函式
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
// 寫入 RDB 版本號
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
// 寫入錯誤,跳轉到werr
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
// 遍歷所有資料庫
for (j = 0; j < server.dbnum; j++) {
// 指向資料庫
redisDb *db = server.db+j;
// 指向資料庫鍵空間
dict *d = db->dict;
// 跳過空資料庫
if (dictSize(d) == 0) continue;
// 創建鍵空間迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/*
* 寫入 DB 選擇器
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(&rdb,j) == -1) goto werr;
/*
* 遍歷資料庫,并寫入每個鍵值對的資料
*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
// 根據 keystr ,在堆疊中創建一個 key 物件
initStaticStringObject(key,keystr);
// 獲取鍵的過期時間
expire = getExpire(db,&key);
// 保存鍵值對資料
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/*
* 寫入 EOF 代碼
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
/*
* CRC64 校驗和,
*
* 如果校驗和功能已關閉,那么 rdb.cksum 將為 0 ,
* 在這種情況下, RDB 載入時會跳過校驗和檢查,
*/
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb,&cksum,8);
// 沖洗快取,確保資料已寫入磁盤
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
/*
* 使用 RENAME ,原子性地對臨時檔案進行改名,覆寫原來的 RDB 檔案,
*/
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
// 寫入完成,列印日志
redisLog(REDIS_NOTICE,"DB saved on disk");
// 清零資料庫臟狀態
server.dirty = 0;
// 記錄最后一次完成 SAVE 的時間
server.lastsave = time(NULL);
// 記錄最后一次執行 SAVE 的狀態
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
// 關閉檔案
fclose(fp);
// 洗掉檔案
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
RDB檔案內容
首先給出一個完整的RDB檔案的格式

后續為描述方便,大寫為常量,小寫為變數或者資料
REDIS這個其實就是RDB檔案的識別符號db_version長度4位元組,記錄RDB檔案的版本號,redis3.0一般使用0006(第六版)databases表示任意個資料庫EOF表示正文內容結束check_sum校驗和,8位元組,通過前面4部分內容計算得出
下面重點說下databases欄位,每個database都是包括如下幾個部分,

-
SELECTDB一位元組,表示接下來要讀一個資料庫號碼 -
db_number表示一個資料庫號碼,長度1、2、5位元組,當讀入該數字后,redis會呼叫select命令進行資料庫切換 -
key_value_pairs表示資料庫中所有的鍵值對資料,其中又分為不帶過期時間的鍵值對,和帶過期時間的鍵值對- 不帶過期的鍵值對,由
TYPE、key、value組成

- 帶過期的鍵值對,由
EXPIRETIME_MS、ms、TYPE、key、value組成

- 不帶過期的鍵值對,由
AOF持久化
AOF持久化是通過保存redis服務器在運行期間所執行的寫命令進行記錄資料,AOF持久化分為命令追加、檔案寫入、檔案同步三個步驟,下面分別對這三個步驟進行闡述
命令追加
當AOF持久化處于打開的狀態,服務器在執行一個寫命令之后,會以某種協議的方式將被執行的寫命令追加到服務器redisServer中的aof_buf緩沖區末尾
檔案寫入與同步
上一次我們說到,redis在運行程序中,是一個事件回圈,每次回圈執行對應的時間事件和檔案事件,因此AOF持久化的寫入也在每次事件回圈結束后進行,執行函式flushAppendOnlyFile
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
// 緩沖區中沒有任何內容,直接回傳
if (sdslen(server.aof_buf) == 0) return;
// 策略為每秒 FSYNC
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 是否有 SYNC 正在后臺進行?
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 每秒 fsync ,并且強制寫入為假
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/*
* 當 fsync 策略為每秒鐘一次時, fsync 在后臺執行,
* 如果后臺仍在執行 FSYNC ,那么我們可以延遲寫操作一兩秒
* (如果強制執行 write 的話,服務器主執行緒將阻塞在 write 上面)
*/
if (sync_in_progress) {
// 有 fsync 正在后臺進行 ,,,
if (server.aof_flush_postponed_start == 0) {
/*
* 前面沒有推遲過 write 操作,這里將推遲寫操作的時間記錄下來
* 然后就回傳,不執行 write 或者 fsync
*/
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/*
* 如果之前已經因為 fsync 而推遲了 write 操作
* 但是推遲的時間不超過 2 秒,那么直接回傳
* 不執行 write 或者 fsync
*/
return;
}
/*
* 如果后臺還有 fsync 在執行,并且 write 已經推遲 >= 2 秒
* 那么執行寫操作(write 將被阻塞)
*/
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/*
* 執行到這里,程式會對 AOF 檔案進行寫入,
* 清零延遲 write 的時間記錄
*/
server.aof_flush_postponed_start = 0;
/*
* 執行單個 write 操作,如果寫入設備是物理的話,那么這個操作應該是原子的
*
* 當然,如果出現像電源中斷這樣的不可抗現象,那么 AOF 檔案也是可能會出現問題的
* 這時就要用 redis-check-aof 程式來進行修復,
*/
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
// 將日志的記錄頻率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
// 如果寫入出錯,那么嘗試將該情況寫入到日志里面
if (nwritten == -1) {
if (can_log) {
redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
redisLog(REDIS_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 嘗試移除新追加的不完整內容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftrunacate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
// 處理寫入 AOF 檔案時出現的錯誤
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synched on disk. */
redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = REDIS_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
// 寫入成功,更新最后寫入狀態
if (server.aof_last_write_status == REDIS_ERR) {
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
}
}
// 更新寫入后的 AOF 檔案大小
server.aof_current_size += nwritten;
/*
* 如果 AOF 快取的大小足夠小的話,那么重用這個快取,
* 否則的話,釋放 AOF 快取,
*/
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
// 清空快取中的內容,等待重用
sdsclear(server.aof_buf);
} else {
// 釋放快取
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/*
* 如果 no-appendfsync-on-rewrite 選項為開啟狀態,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在進行的話,
* 那么不執行 fsync
*/
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// 總是執行 fsnyc
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
// 更新最后一次執行 fsnyc 的時間
server.aof_last_fsync = server.unixtime;
// 策略為每秒 fsnyc ,并且距離上次 fsync 已經超過 1 秒
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 放到后臺執行
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
// 更新最后一次執行 fsync 的時間
server.aof_last_fsync = server.unixtime;
}
}
在上面代碼中,我們可以看到執行fsync有幾種可能,這些可能性通過appendfsync配置進行決定
| appendfsync選項的值 | flushappendonlyfile函式行為 |
|---|---|
| always | 將aof_buf緩沖區所有內容寫入并同步到AOF檔案 |
| everysec | 將aof buf緩沖區中的所有內容寫入到AOF檔案,如果上次同步AOF檔案的時間距離現在超過一秒鐘,那么再次對AOF 檔案進行同步,并且這個同步操作是由一個執行緒專門負責執行的 |
| no | 將aof_buf緩沖區中的所有內容寫入到AOF檔案,但并不對AOF檔案進行同步,何時同步由作業系統來決定 |
AOF重寫
由AOF寫入原理可知,每次執行命令,都會向檔案中寫入命令,那么這就會導致檔案較大,而且對于比如這種情況:先添加一個a鍵,再洗掉一個a鍵,這其實最終的效果是和最初一樣的,若將兩次執行命令都寫入,則其實是沒有用的,因此redis采用AOF重寫的方式,函式為rewriteAppendOnlyFileBackground
/*
* 以下是后臺重寫 AOF 檔案(BGREWRITEAOF)的作業步驟:
*
* 1) 用戶呼叫 BGREWRITEAOF
*
* 2) Redis 呼叫這個函式,它執行 fork() :
*
* 2a) 子行程在臨時檔案中對 AOF 檔案進行重寫
*
* 2b) 父行程將新輸入的寫命令追加到 server.aof_rewrite_buf 中
*
* 3) 當步驟 2a 執行完之后,子行程結束
*
* 4)
* 父行程會捕捉子行程的退出信號,
* 如果子行程的退出狀態是 OK 的話,
* 那么父行程將新輸入命令的快取追加到臨時檔案,
* 然后使用 rename(2) 對臨時檔案改名,用它代替舊的 AOF 檔案,
* 至此,后臺 AOF 重寫完成,
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 已經有行程在進行 AOF 重寫了
if (server.aof_child_pid != -1) return REDIS_ERR;
// 記錄 fork 開始前的時間,計算 fork 耗時用
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* 子行程 */
// 關閉網路連接 fd
closeListeningSockets(0);
// 為行程設定名字,方便記認
redisSetProcTitle("redis-aof-rewrite");
// 創建臨時檔案,并進行 AOF 重寫
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
// 發送重寫成功信號
exitFromChild(0);
} else {
// 發送重寫失敗信號
exitFromChild(1);
}
} else {
/* 父行程 */
// 記錄執行 fork 所消耗的時間
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
// 記錄 AOF 重寫的資訊
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
// 關閉字典自動 rehash
updateDictResizePolicy();
/*
* 將 aof_selected_db 設為 -1 ,
* 強制讓 feedAppendOnlyFile() 下次執行時引發一個 SELECT 命令,
* 從而確保之后新添加的命令會設定到正確的資料庫中
*/
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
AOF重寫的原理,其實是直接讀取當前的資料庫的值,最后使用一條寫陳述句就可以實作AOF重寫
而且AOF重寫是放在后臺子行程執行,這樣可以避免效率太低,但是使用子行程執行重寫方式,則在重寫程序中,父行程還會執行新的寫命令,因此這段事件的命令也要被記錄下來,最后再次同步給子行程
自己的網址:www.shicoder.top
歡迎加群聊天 452380935
本文由博客一文多發平臺 OpenWrite 發布!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/460790.html
標籤:其他
上一篇:如何處理回應可能符合兩個XSD之一的基于XML的協議?
下一篇:muduo專案介紹
