文章目錄
- 本人 github 地址
- 本篇文章看點
- aof的觸發
- aof的策略講解
- aof的buffer寫入時機
- aof的磁盤寫入時機
- aof的rewrite觸發
- aof的rewrite的初始化主流程
- aof寫入資料到檔案的流程
- 父行程的rewrite相關操作
- 父行程發送停止信號的邏輯
- 父行程rewrite結束邏輯
- 總結
本人 github 地址
github 地址 里面有注釋好的代碼,下載下來可以方便閱讀,
本篇文章看點
- aof什么時候觸發,
- aof的各種磁盤策略是如何執行的,
- 什么aof的rewrite,
- rewrite又到底是怎么做了什么,
aof的觸發
上一篇我們講了rdb,大致的了解了rdb是什么樣的一個持久化的方式,但是為什么有了rdb的機制還要引入aof了,其實原因很簡單,rdb的持久化并不能保證我們的資料不會丟失,rdb的持久化更像一種定時的備份,當我們對資料安全度比較高的時候,不僅僅把redis 當作一種快取的時候,對持久化有更高的要求的時候,aof可以滿足我們的需要,
aof的持久化策略主要有三種
具體看下面的配置
aof的策略講解
# The fsync() call tells the Operating System to actually write data on disk
# instead of waiting for more data in the output buffer. Some OS will really flush
# data on disk, some other OS will just try to do it ASAP.
#
# Redis supports three different modes:
#
# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log. Slow, Safest.
# everysec: fsync only one time every second. Compromise.
#
# The default is "everysec", as that's usually the right compromise between
# speed and data safety. It's up to you to understand if you can relax this to
# "no" that will let the operating system flush the output buffer when
# it wants, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting),
# or on the contrary, use "always" that's very slow but a bit safer than
# everysec.
#
# More details please check the following article:
# http://antirez.com/post/redis-persistence-demystified.html
#
# If unsure, use "everysec".
# appendfsync always
appendfsync everysec
# appendfsync no
從上面的注釋我們大概能夠得到以下幾個資訊
- redis 通過呼叫fsync來告訴作業系統,需要把寫入的資料buffer 真正的flush到磁盤,但是在每個作業系統都會這個命令執行也不一樣,有的是立即去做,而有的會把這個flush命令以最快的速度去執行(也就是說還可能等待一段時間)
- aof提供的三種策略第一個種是no,意思aof只會把需要持久化的資料寫入buffer里面,真正是否持久化到硬碟,會根據作業系統的策略決定,
第二種策略是everysecond,就是如果距離上一次呼叫fsync,超過一秒鐘,則本次會呼叫fsync
第三種策略是always,每次aof去持久化都會呼叫fsync,
配置講完了,我們來看下具體aof是如何觸發的,
前面有文章我們有講到命令是在redis是如何執行的,那aof的觸發入口也是這個方法里面
下面是命令如何執行的鏈接地址:redis系列,redis是如何執行命令(一)
下面是我們aof觸發的入口的代碼
aof的buffer寫入時機
server.c
void call(client *c, int flags) {
//dirty 指寫入操作的時候會++
long long dirty;
ustime_t start, duration;
int client_old_flags = c->flags;
//所要執行的命令
struct redisCommand *real_cmd = c->cmd;
server.fixed_time_expire++;
/* Send the command to clients in MONITOR mode if applicable.
* Administrative commands are considered too dangerous to be shown. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
//當執行monitor 命令的時候會把資料傳送過去
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
//消除force aof,force 同步slave,和阻止aof,阻止slave同步
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
/* Call the command. */
dirty = server.dirty;
//更新ustime
updateCachedTime(0);
start = server.ustime;
//執行命令
c->cmd->proc(c);
//執行時間
duration = ustime()-start;
//是否產生寫入
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
//lua相關邏輯
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
/* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
//lua的執行
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}
/* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
//記錄slow log ,如果需要的話
if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
//需要統計命令
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
/* Propagate the command into the AOF and replication link */
//flag 判斷,有可能在命令執行的程序里面
//會改變flag的狀態
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
//如果沒有產生實際的資料寫入則不需要傳播
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
//如果是force aof,則設定狀態
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementations called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
//意思是拒絕aof,replication 傳播的命令需要實作 類似preventCommandPropagation()的邏輯
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
//命令不是從module 匯入過來,且傳播狀態不為空的,進入下面邏輯
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
//將命令的詳細引數傳入aof的buffer的方法
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
.......
從執行命令那篇文章我們知道所有的命令執行都會去呼叫call,
而call里面會通過一系列的邏輯判斷,然后決定需要不需要來呼叫
propagate這個方法,
server.c
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
//如果aof是打開的且命令傳播狀態是aof
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
//傳入引數命令型別,dbid,引數,引數個數
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
可以看到aof的傳播代碼和replication 都會放在這里
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
//查看下當前dbid 是否和上次
//aof的db一樣
if (dictid != server.aof_selected_db) {
char seldb[64];
//列印db,select command
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
//如果關于expire 命令相關的全部轉換成,PEXPIREAT
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
//將setex的命令轉換為set和PEXPIREAT命令
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
}
//下面是set comman的處理邏輯
//可以看到會把相關的所有引數都帶進來
//除了nx xx 因為走到這里肯定是命令
//已經產生了資料覆寫或者新增
else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg || pxarg) {
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
//正常列印的方式
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
//放入aof buf 里面保證性能,然后在event loop 再進入的時候,flush到disk
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
//如果aof的子行程在運行,說明aof在做rewrite 操作,
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;
//每個命令用*開頭
buf[0] = '*';
//整個命令的引數個數
//包括命令的相關引數的key和value
//都會按照次序列印
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
//已'\r\n'來做最后這樣作為string列印的時候這就是結束符號
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
for (j = 0; j < argc; j++) {
//對于壓縮的string 會decode
o = getDecodedObject(argv[j]);
buf[0] = '$';
//同樣會列印引數的長度
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
//放入長度
dst = sdscatlen(dst,buf,len);
//放入value
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
//放入每行的結束符號
dst = sdscatlen(dst,"\r\n",2);
//回收
decrRefCount(o);
}
return dst;
}
上面這一大串代碼我們可以看到,aof的寫入buffer和rdb的不同,在沒有rewrite的時候,寫入aof的就是原始的命令的引數,包括命令的關鍵字和value都會用sds保存下來,
在最后面我們看到了在這里只是把命令的資訊寫入了aof_buf,這個時候并沒有放入磁盤,還有一個地方要值得注意的就是aofRewriteBufferAppend()方法,即當有子行程運行的時候除了放入aof_buffer,也會放入aofRewrite的buffer, 具體邏輯我們在rewrite部分再分析,
那aof_buf什么時候會寫入硬碟了,我們繼續來看
aof的磁盤寫入時機
通過對aof_buf的跟蹤我們可以看到,flush的邏輯主要是在下面這段邏輯里面
aof.c
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
/**
* 上面注釋可以看到正常情況下下面的方法會在before sleep方法呼叫
* 但基于有可能設定every second的時候會被write(2)這個方法所阻塞
* 而導致有很多滿足條件的buffer還沒有flush
* 所以會在serverCron這個方法再次被呼叫
*/
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
//當aof buf==0的時候
if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
//在every sec這種模式下有可能,雖然buffer 但上一次仍然有資料
//沒有fsync
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
//如果側露是everysec
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
//是否sync 在progress
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
//如果有progress再進行
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
//前面沒有等待aof,flush的任務
server.aof_flush_postponed_start = server.unixtime;
return;
}
//小于兩秒的也直接回傳
else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
//delay fsync的次數++,并出日志,
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
//為了確保flush的原子性,然后flush前sleep一下
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}
latencyStartMonitor(latency);
//寫入磁盤
//只是寫入硬碟沒有flush
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
//計入延遲
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
//下面是記錄各種時間的延遲
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
//延遲start重新設定為0
server.aof_flush_postponed_start = 0;
//長度不一致的時候表示有寫入erro發生
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
//記錄error log
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
//撤銷的方式
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
//如果是always ,因為策略的關系,無法通知到用戶這個error的產生
//則redis 實體會退出
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
//嘗試下次寫入是否能成功,重試策略
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
//無法清除的時候則往前賦值
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
//如果上一個寫入是error 則恢復成ok的狀態
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
//小于4000的做清理
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
//不然回收重新分配空間
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//fsync的邏輯
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
//如果有子行程在運行,且設定aof_no_fsync_on_rewrite為true
//則不做fsync操作
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
/* Perform the fsync if needed. */
//下面就是呼叫fsync的判斷
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
}
//unixtime是快取每1s更新一次
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
//everysecond的采用策略是通過back groud的形式
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
//設定上次fsync的時間
server.aof_last_fsync = server.unixtime;
}
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
.......
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
//有滿足條件的aof未寫入的時候,會被呼叫
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
//上一次執行有錯誤的時候
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
.......
}
上面我也注釋的非常詳細,這個方法什么時候會被呼叫,至于before sleep 什么時候會被呼叫請參考
redis系列,redis網路,你得知道的一些事
總結下來并不是設定always就會在每一次執行命令的時候,立刻去flush或者寫入磁盤,而是同一批次的命令,就是屬于同一批次eventloop的時候,但是要清楚的設定always的flush是發生在資料回傳客戶端前,所以其一致性方面的保證還是有的,但是在某些作業系統下面flush并不會真正立刻執行,則在非常極端的情況下仍然有可能導致資料丟失
另外every-second也是近似的1s內的資料不丟失,但如果真的發生故障,可能丟失的資料不止1s內,甚至在為保證性能的情況下開啟了no-appendfsync-on-rewrite會丟失更多時間的資料,即看上面的知道如果有子行程運行的情況下是不會呼叫fsync call,且即使沒開啟我們也可以看到上面代碼的邏輯判斷,當sync操作沒有執行完的時候不會新的子執行緒去執行fsync操作,
# When the AOF fsync policy is set to always or everysec, and a background
# saving process (a background save or AOF log background rewriting) is
# performing a lot of I/O against the disk, in some Linux configurations
# Redis may block too long on the fsync() call. Note that there is no fix for
# this currently, as even performing fsync in a different thread will block
# our synchronous write(2) call.
#
# In order to mitigate this problem it's possible to use the following option
# that will prevent fsync() from being called in the main process while a
# BGSAVE or BGREWRITEAOF is in progress.
#
# This means that while another child is saving, the durability of Redis is
# the same as "appendfsync none". In practical terms, this means that it is
# possible to lose up to 30 seconds of log in the worst scenario (with the
# default Linux settings).
#
# If you have latency problems turn this to "yes". Otherwise leave it as
# "no" that is the safest pick from the point of view of durability.
no-appendfsync-on-rewrite no
aof的rewrite觸發
# Automatic rewrite of the append only file.
# Redis is able to automatically rewrite the log file implicitly calling
# BGREWRITEAOF when the AOF log size grows by the specified percentage.
#
# This is how it works: Redis remembers the size of the AOF file after the
# latest rewrite (if no rewrite has happened since the restart, the size of
# the AOF at startup is used).
#
# This base size is compared to the current size. If the current size is
# bigger than the specified percentage, the rewrite is triggered. Also
# you need to specify a minimal size for the AOF file to be rewritten, this
# is useful to avoid rewriting the AOF file even if the percentage increase
# is reached but it is still pretty small.
#
# Specify a percentage of zero in order to disable the automatic AOF
# rewrite feature.
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
同樣aof的rewrite 會在serverCron 這個方法里被呼叫,
//aof的rewrite的時機
/* Trigger an AOF rewrite if needed. */
//如果aof state設定為on
//沒有子行程在運行
//currentSize要超過設定的min-size
//且每次aof檔案大小成長百分比超過所設定的百分比
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
aof的rewrite的初始化主流程
* ----------------------------------------------------------------------------
* AOF background rewrite
* ------------------------------------------------------------------------- */
/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
* 1. 會用到子行程的方式來做這件事
* 2. 子行程會重寫aof file用一個臨時的檔案,在重寫的程序中父行程將收集程序里面新來的寫入資料
* 3, 子行程完成任務退出
* 4, parent 收到退出的code將diff的資料寫入零時檔案,rename成aof的正式檔案,
*
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
//再次判斷是否有子行程
if (hasActiveChildProcess()) return C_ERR;
//創建父子行程通道
if (aofCreatePipes() != C_OK) return C_ERR;
//這個通道主要用于發送copy on write的資訊
//跟具體的aof邏輯無關
openChildInfoPipe();
//redis fork操作等同rbd的fork
if ((childpid = redisFork()) == 0) {
char tmpfile[256];
//child的執行邏輯
/* Child */
//設定title
redisSetProcTitle("redis-aof-rewrite");
//設定親和醒
redisSetCpuAffinity(server.aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
//rewrite的主要邏輯
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
//發送通道資訊
sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
//-1的時候為創建失敗
if (childpid == -1) {
closeChildInfoPipe();
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
aofClosePipes();
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
//設定aof的相關值
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
//replication 相關邏輯
replicationScriptCacheFlush();
return C_OK;
}
return C_OK; /* unreached */
}
/* Create the pipes used for parent - child process IPC during rewrite.
* We have a data pipe used to send AOF incremental diffs to the child,
* and two other pipes used by the children to signal it finished with
* the rewrite so no more data should be written, and another for the
* parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
//會創建3個不一樣的通道
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
//第一個通道用于父行程向子行程傳送資料
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
/* Parent -> children data is non blocking. */
//設定成非阻塞的方式
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
//注冊父行程讀子行程傳回來的ack
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
//parent write date to child
server.aof_pipe_write_data_to_child = fds[1];
//child read data from parent
server.aof_pipe_read_data_from_parent = fds[0];
//child write ack to parent
server.aof_pipe_write_ack_to_parent = fds[3];
//parent read ack from child
server.aof_pipe_read_ack_from_child = fds[2];
//parent write ack to child
server.aof_pipe_write_ack_to_child = fds[5];
//child read ack from parent
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;
error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return C_ERR;
}
上面這段邏輯基本和rbd的邏輯相似,唯一不同的是aof建立了6個不同的通道來完成父子行程之間的通訊,為什么要這樣做了因為aof需要care在子行程程序中不斷插入的新資料,為了確保資料會出現新的檔案中,
aof寫入資料到檔案的流程
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
//tmp file的名字
//生成一個檔案
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
//child 用的buf
server.aof_child_diff = sdsempty();
//初始化寫入的檔案
rioInitWithFile(&aof,fp);
//寫入程序中fsync邏輯
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
//跟rdb 相同的邏輯通知到相關模塊
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) {
int error;
//aof_use_rdb_preamble 如果是用rdb的格式,則跟rdb是相同的邏輯
//唯一不同的最后會呼叫aofReadDiffFromParent這個方法
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
//同樣的也是遍歷字典然后寫入到檔案
//格式方面大同小異
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
/* Read again a few times to get more data from the parent.
* We can't read forever (the server may receive data from clients
* faster than it is able to send data to the child), so we try to read
* some more data in a loop as soon as there is a good chance more data
* will come. If it looks like we are wasting time, we abort (this
* happens after 20 ms without new data). */
int nodata = 0;
mstime_t start = mstime();
//在合理的時機結束回圈,比如1s內沒有新的資料,nodata loop超過20次以上
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
/* Ask the master to stop sending diffs. */
//發送資料讓父行程不用在發送資料過來
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
//設定nonblock
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
/* We read the ACK from the server using a 10 seconds timeout. Normally
* it should reply ASAP, but just in case we lose its reply, we are sure
* the child will eventually get terminated. */
//設定超時10s,通常情況下會非常快讀到response
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
//parent 同意停止發送diff的資料
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
/* Read the final diff if any. */
//最后一次read 管道里面的資料
aofReadDiffFromParent();
/* Write the received diff to the file. */
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
//diff的資料寫入buffer
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Make sure data will not remain on the OS's output buffers */
//呼叫flush
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
//關閉檔案
if (fclose(fp) == EOF) goto werr;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
//重命名檔案
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
可以看到基本上流程和rdb的流程一樣,同樣aof也可以使用rdb的format,但是不同的時候在下面有如何去處理父行程發來different 資料的流程,
以上就是子行程aof的處理全程序,
我們再來看看父行程是在哪里接收到信號,以及收到子行程訊息后的后續處理,
父行程的rewrite相關操作
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
//獲取aof_rewrite_buf_blocks的鏈表尾節點
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
//查看buffer 容量是否足夠
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
//如果空間足夠則復制進來
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
//重新分配broken;
if (len) { /* First block to allocate, or need another block. */
int numblocks;
block = zmalloc(sizeof(*block));
//10 MB per block
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
//超過10個blocks則列印自制
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
/* Install a file event to send data to the rewrite child if there is
* not one already. */
//可以看到aof將更新的資料通知到aof是采用事件的方式注冊,等待下一次file的事件的回圈
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
上面內容我們有提到過在append aof buffer的同時,也會直接注冊rewrite事件到event loop 里面,然后負責傳送到子行程的方法,就是用的aofChildWriteDiffData,
/* Event handler used to send data to the child process doing the AOF
* rewrite. We send pieces of our AOF differences buffer so that the final
* write when the child finishes the rewrite will be small. */
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
//通過block這個結構來承載資料
aofrwblock *block;
ssize_t nwritten;
UNUSED(el);
UNUSED(fd);
UNUSED(privdata);
UNUSED(mask);
while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
//如果收到了停止信號則不再發送資料
if (server.aof_stop_sending_diff || !block) {
//delete 事件
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
//寫入資料到子行程管道里面
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
上面代碼就是向子行程發送diff資料的主邏輯,
父行程發送停止信號的邏輯
在初始化子行程的時候我們有注意到我們開啟了一個監聽讀的事件
//注冊父行程讀子行程傳回來的ack
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
/* This event handler is called when the AOF rewriting child sends us a
* single '!' char to signal we should stop sending buffer diffs. The
* parent sends a '!' as well to acknowledge. */
//當注冊的管道有可讀事件回應的時候會觸發這個方法
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
char byte;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);
//讀取資料
if (read(fd,&byte,1) == 1 && byte == '!') {
serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
server.aof_stop_sending_diff = 1;
//寫入feedback的ack
if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
/* If we can't send the ack, inform the user, but don't try again
* since in the other side the children will use a timeout if the
* kernel can't buffer our write, or, the children was
* terminated. */
serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
strerror(errno));
}
}
/* Remove the handler since this can be called only one time during a
* rewrite. */
//洗掉掉該file event
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}
當子行程回寫事件到這個管道的時候,就會觸發到aofChildPipeReadable,這樣父行程就會發送feedback到子行程,即使沒發送成功,子行程本身也有超時的操作,
父行程rewrite結束邏輯
父行程的結束邏輯同樣也是放在了serverCron這個方法里面,且位置也和rdb在同一個地方,
void checkChildrenDone(void) {
int statloc;
pid_t pid;
/* If we have a diskless rdb child (note that we support only one concurrent
* child), we want to avoid collecting it's exit status and acting on it
* as long as we didn't finish to drain the pipe, since then we're at risk
* of starting a new fork and a new pipe before we're done with the previous
* one. */
//檢查是否有rdb 在運行
if (server.rdb_child_pid != -1 && server.rdb_pipe_conns)
return;
//檢查子行程是否已經完成
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
/* sigKillChildHandler catches the signal and calls exit(), but we
* must make sure not to flag lastbgsave_status, etc incorrectly.
* We could directly terminate the child process via SIGUSR1
* without handling it, but in this case Valgrind will log an
* annoying error. */
if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
bysignal = SIGUSR1;
exitcode = 1;
}
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid,
(int) server.module_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
}
//aof rewrite的邏輯
else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.module_child_pid) {
ModuleForkDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
}
父行程的結束邏輯
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
int newfd, oldfd;
char tmpfile[256];
long long now = ustime();
mstime_t latency;
serverLog(LL_NOTICE,
"Background AOF rewrite terminated with success");
/* Flush the differences accumulated by the parent to the
* rewritten AOF. */
latencyStartMonitor(latency);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
//打開新檔案
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
//將剩下沒有發送給子行程的資料寫入到新檔案
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
//記錄延遲
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
serverLog(LL_NOTICE,
"Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
/* The only remaining thing to do is to rename the temporary file to
* the configured file and switch the file descriptor used to do AOF
* writes. We don't want close(2) or rename(2) calls to block the
* server on old file deletion.
*
* There are two possible scenarios:
*
* 1) AOF is DISABLED and this was a one time rewrite. The temporary
* file will be renamed to the configured file. When this file already
* exists, it will be unlinked, which may block the server.
*
* 2) AOF is ENABLED and the rewritten AOF will immediately start
* receiving writes. After the temporary file is renamed to the
* configured file, the original AOF file descriptor will be closed.
* Since this will be the last reference to that file, closing it
* causes the underlying file to be unlinked, which may block the
* server.
*
* To mitigate the blocking effect of the unlink operation (either
* caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
* use a background thread to take care of this. First, we
* make scenario 1 identical to scenario 2 by opening the target file
* when it exists. The unlink operation after the rename(2) will then
* be executed upon calling close(2) for its descriptor. Everything to
* guarantee atomicity for this switch has already happened by then, so
* we don't care what the outcome or duration of that close operation
* is, as long as the file descriptor is released again. */
//如果aof是disabled,
if (server.aof_fd == -1) {
/* AOF disabled */
/* Don't care if this fails: oldfd will be -1 and we handle that.
* One notable case of -1 return is if the old file does
* not exist. */
//舊的檔案設定為只讀,不阻塞
oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
} else {
/* AOF enabled */
oldfd = -1; /* We'll set this to the current AOF filedes later. */
}
/* Rename the temporary file. This will not unlink the target file if
* it exists, because we reference it with "oldfd". */
latencyStartMonitor(latency);
//如果rename失敗
if (rename(tmpfile,server.aof_filename) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename",latency);
if (server.aof_fd == -1) {
//aof disable 就無需要關心aof的檔案
/* AOF disabled, we don't need to set the AOF file descriptor
* to this new file, so we can close it. */
close(newfd);
} else {
/* AOF enabled, replace the old fd with the new one. */
//設定aof的檔案描述符
oldfd = server.aof_fd;
server.aof_fd = newfd;
//如果是always 執行sync 邏輯
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
redis_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
//更新aof的size
aofUpdateCurrentSize();
//更新basesize fsync的邏輯
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
/* Clear regular AOF buffer since its contents was just written to
* the new AOF from the background rewrite buffer. */
//清除aof_buf未處理完的資料
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//設定status
server.aof_lastbgrewrite_status = C_OK;
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
//aof state 狀態改變
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_state = AOF_ON;
/* Asynchronously close the overwritten AOF. */
//關閉舊的fd檔案通過backgroud
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Background AOF rewrite terminated with error");
} else {
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error condition. */
if (bysignal != SIGUSR1)
server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
}
cleanup:
//關閉通道
aofClosePipes();
//rewrite buffer 重置
aofRewriteBufferReset();
//刪掉臨時檔案
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
//更新rewrite last
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
}
以上的aof相關的所有流程總結,雖然涉及的代碼很多,但是邏輯相對來說還是比較清晰,
總結
redis相關的持久化流程我們已經講完了,下面的章節會開始探索redis 高可用和cluster的相關流程,有需要的同學歡迎收藏加點贊
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/238107.html
標籤:其他
上一篇:個人總結(二)
下一篇:C語言資料結構
