主頁 > 軟體設計 > redis系列,aof持久化,你不知道的一些秘密!深度原始碼剖析

redis系列,aof持久化,你不知道的一些秘密!深度原始碼剖析

2020-12-21 12:17:58 軟體設計

文章目錄

  • 本人 github 地址
  • 本篇文章看點
  • aof的觸發
    • aof的策略講解
    • aof的buffer寫入時機
    • aof的磁盤寫入時機
  • aof的rewrite觸發
    • aof的rewrite的初始化主流程
    • aof寫入資料到檔案的流程
    • 父行程的rewrite相關操作
    • 父行程發送停止信號的邏輯
    • 父行程rewrite結束邏輯
  • 總結


本人 github 地址

github 地址 里面有注釋好的代碼,下載下來可以方便閱讀,

本篇文章看點

  1. aof什么時候觸發,
  2. aof的各種磁盤策略是如何執行的,
  3. 什么aof的rewrite,
  4. rewrite又到底是怎么做了什么,

aof的觸發

上一篇我們講了rdb,大致的了解了rdb是什么樣的一個持久化的方式,但是為什么有了rdb的機制還要引入aof了,其實原因很簡單,rdb的持久化并不能保證我們的資料不會丟失,rdb的持久化更像一種定時的備份,當我們對資料安全度比較高的時候,不僅僅把redis 當作一種快取的時候,對持久化有更高的要求的時候,aof可以滿足我們的需要,
aof的持久化策略主要有三種
具體看下面的配置

aof的策略講解


# The fsync() call tells the Operating System to actually write data on disk
# instead of waiting for more data in the output buffer. Some OS will really flush
# data on disk, some other OS will just try to do it ASAP.
#
# Redis supports three different modes:
#
# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log. Slow, Safest.
# everysec: fsync only one time every second. Compromise.
#
# The default is "everysec", as that's usually the right compromise between
# speed and data safety. It's up to you to understand if you can relax this to
# "no" that will let the operating system flush the output buffer when
# it wants, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting),
# or on the contrary, use "always" that's very slow but a bit safer than
# everysec.
#
# More details please check the following article:
# http://antirez.com/post/redis-persistence-demystified.html
#
# If unsure, use "everysec".

# appendfsync always
appendfsync everysec
# appendfsync no

從上面的注釋我們大概能夠得到以下幾個資訊

  1. redis 通過呼叫fsync來告訴作業系統,需要把寫入的資料buffer 真正的flush到磁盤,但是在每個作業系統都會這個命令執行也不一樣,有的是立即去做,而有的會把這個flush命令以最快的速度去執行(也就是說還可能等待一段時間)
  2. aof提供的三種策略第一個種是no,意思aof只會把需要持久化的資料寫入buffer里面,真正是否持久化到硬碟,會根據作業系統的策略決定,
    第二種策略是everysecond,就是如果距離上一次呼叫fsync,超過一秒鐘,則本次會呼叫fsync
    第三種策略是always,每次aof去持久化都會呼叫fsync,

配置講完了,我們來看下具體aof是如何觸發的,
前面有文章我們有講到命令是在redis是如何執行的,那aof的觸發入口也是這個方法里面
下面是命令如何執行的鏈接地址:redis系列,redis是如何執行命令(一)
下面是我們aof觸發的入口的代碼

aof的buffer寫入時機

server.c

void call(client *c, int flags) {
    //dirty 指寫入操作的時候會++
    long long dirty;
    ustime_t start, duration;
    int client_old_flags = c->flags;
    //所要執行的命令
    struct redisCommand *real_cmd = c->cmd;

    server.fixed_time_expire++;

    /* Send the command to clients in MONITOR mode if applicable.
     * Administrative commands are considered too dangerous to be shown. */
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
        //當執行monitor 命令的時候會把資料傳送過去
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

    /* Initialization: clear the flags that must be set by the command on
     * demand, and initialize the array for additional commands propagation. */
    //消除force aof,force 同步slave,和阻止aof,阻止slave同步
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    redisOpArray prev_also_propagate = server.also_propagate;
    redisOpArrayInit(&server.also_propagate);

    /* Call the command. */
    dirty = server.dirty;
    //更新ustime
    updateCachedTime(0);
    start = server.ustime;
    //執行命令
    c->cmd->proc(c);
    //執行時間
    duration = ustime()-start;
    //是否產生寫入
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;

    /* When EVAL is called loading the AOF we don't want commands called
     * from Lua to go into the slowlog or to populate statistics. */
    //lua相關邏輯
    if (server.loading && c->flags & CLIENT_LUA)
        flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);

    /* If the caller is Lua, we want to force the EVAL caller to propagate
     * the script if the command flag or client flag are forcing the
     * propagation. */
    //lua的執行
    if (c->flags & CLIENT_LUA && server.lua_caller) {
        if (c->flags & CLIENT_FORCE_REPL)
            server.lua_caller->flags |= CLIENT_FORCE_REPL;
        if (c->flags & CLIENT_FORCE_AOF)
            server.lua_caller->flags |= CLIENT_FORCE_AOF;
    }

    /* Log the command into the Slow log if needed, and populate the
     * per-command statistics that we show in INFO commandstats. */
    //記錄slow log ,如果需要的話
    if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
    //需要統計命令
    if (flags & CMD_CALL_STATS) {
        /* use the real command that was executed (cmd and lastamc) may be
         * different, in case of MULTI-EXEC or re-written commands such as
         * EXPIRE, GEOADD, etc. */
        real_cmd->microseconds += duration;
        real_cmd->calls++;
    }

    /* Propagate the command into the AOF and replication link */
    //flag 判斷,有可能在命令執行的程序里面
    //會改變flag的狀態
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;

        /* Check if the command operated changes in the data set. If so
         * set for replication / AOF propagation. */
         //如果沒有產生實際的資料寫入則不需要傳播
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        /* If the client forced AOF / replication of the command, set
         * the flags regardless of the command effects on the data set. */
        //如果是force aof,則設定狀態
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        /* However prevent AOF / replication propagation if the command
         * implementations called preventCommandPropagation() or similar,
         * or if we don't have the call() flags to do so. */
        //意思是拒絕aof,replication 傳播的命令需要實作 類似preventCommandPropagation()的邏輯
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;

        /* Call propagate() only if at least one of AOF / replication
         * propagation is needed. Note that modules commands handle replication
         * in an explicit way, so we never replicate them automatically. */
        //命令不是從module 匯入過來,且傳播狀態不為空的,進入下面邏輯
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
        	//將命令的詳細引數傳入aof的buffer的方法
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
.......

從執行命令那篇文章我們知道所有的命令執行都會去呼叫call,
而call里面會通過一系列的邏輯判斷,然后決定需要不需要來呼叫
propagate這個方法,

server.c

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    //如果aof是打開的且命令傳播狀態是aof
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        //傳入引數命令型別,dbid,引數,引數個數
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

可以看到aof的傳播代碼和replication 都會放在這里

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appended. To issue a SELECT command is needed. */
    //查看下當前dbid 是否和上次
    //aof的db一樣
    if (dictid != server.aof_selected_db) {
        char seldb[64];
        //列印db,select command
        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        //如果關于expire 命令相關的全部轉換成,PEXPIREAT
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        //將setex的命令轉換為set和PEXPIREAT命令
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } 
    //下面是set comman的處理邏輯
    //可以看到會把相關的所有引數都帶進來
    //除了nx xx 因為走到這里肯定是命令
    //已經產生了資料覆寫或者新增
    else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));

        if (exarg || pxarg) {
            /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
            buf = catAppendOnlyGenericCommand(buf,3,argv);
            if (exarg)
                buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                                   exarg);
            if (pxarg)
                buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                                   pxarg);
        } else {
            buf = catAppendOnlyGenericCommand(buf,argc,argv);
        }
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        //正常列印的方式
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == AOF_ON)
        //放入aof buf 里面保證性能,然后在event loop 再進入的時候,flush到disk
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    //如果aof的子行程在運行,說明aof在做rewrite 操作,
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;
    //每個命令用*開頭
    buf[0] = '*';
    //整個命令的引數個數
    //包括命令的相關引數的key和value
    //都會按照次序列印
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    //已'\r\n'來做最后這樣作為string列印的時候這就是結束符號
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    for (j = 0; j < argc; j++) {
        //對于壓縮的string 會decode
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        //同樣會列印引數的長度
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        //放入長度
        dst = sdscatlen(dst,buf,len);
        //放入value
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        //放入每行的結束符號
        dst = sdscatlen(dst,"\r\n",2);
        //回收
        decrRefCount(o);
    }
    return dst;
}

上面這一大串代碼我們可以看到,aof的寫入buffer和rdb的不同,在沒有rewrite的時候,寫入aof的就是原始的命令的引數,包括命令的關鍵字和value都會用sds保存下來,

在最后面我們看到了在這里只是把命令的資訊寫入了aof_buf,這個時候并沒有放入磁盤,還有一個地方要值得注意的就是aofRewriteBufferAppend()方法,即當有子行程運行的時候除了放入aof_buffer,也會放入aofRewrite的buffer, 具體邏輯我們在rewrite部分再分析,

那aof_buf什么時候會寫入硬碟了,我們繼續來看

aof的磁盤寫入時機

通過對aof_buf的跟蹤我們可以看到,flush的邏輯主要是在下面這段邏輯里面
aof.c

/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
/**
 * 上面注釋可以看到正常情況下下面的方法會在before sleep方法呼叫
 * 但基于有可能設定every second的時候會被write(2)這個方法所阻塞
 * 而導致有很多滿足條件的buffer還沒有flush
 * 所以會在serverCron這個方法再次被呼叫
 */
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;
    //當aof buf==0的時候
    if (sdslen(server.aof_buf) == 0) {
        /* Check if we need to do fsync even the aof buffer is empty,
         * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
         * called only when aof buffer is not empty, so if users
         * stop write commands before fsync called in one second,
         * the data in page cache cannot be flushed in time. */
        //在every sec這種模式下有可能,雖然buffer 但上一次仍然有資料
        //沒有fsync
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
            server.aof_fsync_offset != server.aof_current_size &&
            server.unixtime > server.aof_last_fsync &&
            !(sync_in_progress = aofFsyncInProgress())) {
            goto try_fsync;
        } else {
            return;
        }
    }
    //如果側露是everysec
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
       //是否sync 在progress
        sync_in_progress = aofFsyncInProgress();

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            //如果有progress再進行
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                //前面沒有等待aof,flush的任務
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } 
            //小于兩秒的也直接回傳
            else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            //delay fsync的次數++,并出日志,
            server.aof_delayed_fsync++;
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */
    //為了確保flush的原子性,然后flush前sleep一下
    if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
        usleep(server.aof_flush_sleep);
    }

    latencyStartMonitor(latency);
    //寫入磁盤
    //只是寫入硬碟沒有flush
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    //計入延遲
    latencyEndMonitor(latency);
    /* We want to capture different events for delayed writes:
     * when the delay happens with a pending fsync, or with a saving child
     * active, and when the above two conditions are missing.
     * We also use an additional event name to save all samples which is
     * useful for graphing / monitoring purposes. */
     //下面是記錄各種時間的延遲
    if (sync_in_progress) {
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (hasActiveChildProcess()) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
    latencyAddSampleIfNeeded("aof-write",latency);

    /* We performed the write so reset the postponed flush sentinel to zero. */
    //延遲start重新設定為0
    server.aof_flush_postponed_start = 0;
     //長度不一致的時候表示有寫入erro發生   
    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        /* Log the AOF write error and record the error code. */
        //記錄error log
        if (nwritten == -1) {
            if (can_log) {
                serverLog(LL_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                serverLog(LL_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }
            //撤銷的方式
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    serverLog(LL_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftruncate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        //如果是always ,因為策略的關系,無法通知到用戶這個error的產生
        //則redis 實體會退出
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synced on disk. */
            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            //嘗試下次寫入是否能成功,重試策略
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = C_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                //無法清除的時候則往前賦值
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        //如果上一個寫入是error 則恢復成ok的狀態
        if (server.aof_last_write_status == C_ERR) {
            serverLog(LL_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = C_OK;
        }
    }
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    //小于4000的做清理
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        //不然回收重新分配空間
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
//fsync的邏輯
try_fsync:
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
     //如果有子行程在運行,且設定aof_no_fsync_on_rewrite為true
     //則不做fsync操作
    if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
        return;

    /* Perform the fsync if needed. */
    //下面就是呼叫fsync的判斷
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* redis_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_fsync_offset = server.aof_current_size;
        server.aof_last_fsync = server.unixtime;
    } 
    //unixtime是快取每1s更新一次
    else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) {
            //everysecond的采用策略是通過back groud的形式
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        //設定上次fsync的時間
        server.aof_last_fsync = server.unixtime;
    }
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
.......
      /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    //有滿足條件的aof未寫入的時候,會被呼叫
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    //上一次執行有錯誤的時候
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }
.......    

}

上面我也注釋的非常詳細,這個方法什么時候會被呼叫,至于before sleep 什么時候會被呼叫請參考
redis系列,redis網路,你得知道的一些事
總結下來并不是設定always就會在每一次執行命令的時候,立刻去flush或者寫入磁盤,而是同一批次的命令,就是屬于同一批次eventloop的時候,但是要清楚的設定always的flush是發生在資料回傳客戶端前,所以其一致性方面的保證還是有的,但是在某些作業系統下面flush并不會真正立刻執行,則在非常極端的情況下仍然有可能導致資料丟失
另外every-second也是近似的1s內的資料不丟失,但如果真的發生故障,可能丟失的資料不止1s內,甚至在為保證性能的情況下開啟了no-appendfsync-on-rewrite會丟失更多時間的資料,即看上面的知道如果有子行程運行的情況下是不會呼叫fsync call,且即使沒開啟我們也可以看到上面代碼的邏輯判斷,當sync操作沒有執行完的時候不會新的子執行緒去執行fsync操作,

# When the AOF fsync policy is set to always or everysec, and a background
# saving process (a background save or AOF log background rewriting) is
# performing a lot of I/O against the disk, in some Linux configurations
# Redis may block too long on the fsync() call. Note that there is no fix for
# this currently, as even performing fsync in a different thread will block
# our synchronous write(2) call.
#
# In order to mitigate this problem it's possible to use the following option
# that will prevent fsync() from being called in the main process while a
# BGSAVE or BGREWRITEAOF is in progress.
#
# This means that while another child is saving, the durability of Redis is
# the same as "appendfsync none". In practical terms, this means that it is
# possible to lose up to 30 seconds of log in the worst scenario (with the
# default Linux settings).
#
# If you have latency problems turn this to "yes". Otherwise leave it as
# "no" that is the safest pick from the point of view of durability.

no-appendfsync-on-rewrite no

aof的rewrite觸發

# Automatic rewrite of the append only file.
# Redis is able to automatically rewrite the log file implicitly calling
# BGREWRITEAOF when the AOF log size grows by the specified percentage.
#
# This is how it works: Redis remembers the size of the AOF file after the
# latest rewrite (if no rewrite has happened since the restart, the size of
# the AOF at startup is used).
#
# This base size is compared to the current size. If the current size is
# bigger than the specified percentage, the rewrite is triggered. Also
# you need to specify a minimal size for the AOF file to be rewritten, this
# is useful to avoid rewriting the AOF file even if the percentage increase
# is reached but it is still pretty small.
#
# Specify a percentage of zero in order to disable the automatic AOF
# rewrite feature.

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

同樣aof的rewrite 會在serverCron 這個方法里被呼叫,

       //aof的rewrite的時機
        /* Trigger an AOF rewrite if needed. */
        //如果aof state設定為on
        //沒有子行程在運行
        //currentSize要超過設定的min-size
        //且每次aof檔案大小成長百分比超過所設定的百分比
        if (server.aof_state == AOF_ON &&
            !hasActiveChildProcess() &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }

aof的rewrite的初始化主流程

* ----------------------------------------------------------------------------
 * AOF background rewrite
 * ------------------------------------------------------------------------- */

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 * 1. 會用到子行程的方式來做這件事
 * 2. 子行程會重寫aof file用一個臨時的檔案,在重寫的程序中父行程將收集程序里面新來的寫入資料
 * 3, 子行程完成任務退出
 * 4, parent 收到退出的code將diff的資料寫入零時檔案,rename成aof的正式檔案,
 *
 */

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    //再次判斷是否有子行程
    if (hasActiveChildProcess()) return C_ERR;
    //創建父子行程通道
    if (aofCreatePipes() != C_OK) return C_ERR;
    //這個通道主要用于發送copy on write的資訊
    //跟具體的aof邏輯無關
    openChildInfoPipe();
    //redis fork操作等同rbd的fork
    if ((childpid = redisFork()) == 0) {
        char tmpfile[256];
        //child的執行邏輯
        /* Child */
        //設定title 
        redisSetProcTitle("redis-aof-rewrite");
        //設定親和醒
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        //rewrite的主要邏輯
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            //發送通道資訊
            sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        //-1的時候為創建失敗
        if (childpid == -1) {
            closeChildInfoPipe();
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
         //設定aof的相關值   
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        //replication 相關邏輯
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}

/* Create the pipes used for parent - child process IPC during rewrite.
 * We have a data pipe used to send AOF incremental diffs to the child,
 * and two other pipes used by the children to signal it finished with
 * the rewrite so no more data should be written, and another for the
 * parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
    //會創建3個不一樣的通道
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;
    //第一個通道用于父行程向子行程傳送資料
    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
    /* Parent -> children data is non blocking. */
    //設定成非阻塞的方式
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    //注冊父行程讀子行程傳回來的ack
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
    //parent write date to child
    server.aof_pipe_write_data_to_child = fds[1];
    //child read data from parent
    server.aof_pipe_read_data_from_parent = fds[0];
    //child write ack to parent
    server.aof_pipe_write_ack_to_parent = fds[3];
    //parent read ack from child
    server.aof_pipe_read_ack_from_child = fds[2];
    //parent write ack to child
    server.aof_pipe_write_ack_to_child = fds[5];
    //child read ack from parent
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}

上面這段邏輯基本和rbd的邏輯相似,唯一不同的是aof建立了6個不同的通道來完成父子行程之間的通訊,為什么要這樣做了因為aof需要care在子行程程序中不斷插入的新資料,為了確保資料會出現新的檔案中,

aof寫入資料到檔案的流程

/* Write a sequence of commands able to fully rebuild the dataset into
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
 * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
 * are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp;
    char tmpfile[256];
    char byte;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    //tmp file的名字
    //生成一個檔案
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }
    //child 用的buf
    server.aof_child_diff = sdsempty();
    //初始化寫入的檔案
    rioInitWithFile(&aof,fp);
    //寫入程序中fsync邏輯
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
    //跟rdb 相同的邏輯通知到相關模塊
    startSaving(RDBFLAGS_AOF_PREAMBLE);

    if (server.aof_use_rdb_preamble) {
        int error;
        //aof_use_rdb_preamble 如果是用rdb的格式,則跟rdb是相同的邏輯
        //唯一不同的最后會呼叫aofReadDiffFromParent這個方法
        if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        //同樣的也是遍歷字典然后寫入到檔案
        //格式方面大同小異
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }

    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    /* Read again a few times to get more data from the parent.
     * We can't read forever (the server may receive data from clients
     * faster than it is able to send data to the child), so we try to read
     * some more data in a loop as soon as there is a good chance more data
     * will come. If it looks like we are wasting time, we abort (this
     * happens after 20 ms without new data). */
    int nodata = 0;
    mstime_t start = mstime();
    //在合理的時機結束回圈,比如1s內沒有新的資料,nodata loop超過20次以上
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }

    /* Ask the master to stop sending diffs. */
    //發送資料讓父行程不用在發送資料過來
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    //設定nonblock
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    //設定超時10s,通常情況下會非常快讀到response
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    //parent 同意停止發送diff的資料    
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    //最后一次read 管道里面的資料
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    //diff的資料寫入buffer    
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    //呼叫flush
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    //關閉檔案
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    //重命名檔案
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
    stopSaving(1);
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    stopSaving(0);
    return C_ERR;
}

可以看到基本上流程和rdb的流程一樣,同樣aof也可以使用rdb的format,但是不同的時候在下面有如何去處理父行程發來different 資料的流程,

以上就是子行程aof的處理全程序,

我們再來看看父行程是在哪里接收到信號,以及收到子行程訊息后的后續處理,

父行程的rewrite相關操作

/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    //獲取aof_rewrite_buf_blocks的鏈表尾節點
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
            //查看buffer 容量是否足夠
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                //如果空間足夠則復制進來
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }
        //重新分配broken;
        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            //10 MB per block
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                //超過10個blocks則列印自制                                         
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }

    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    //可以看到aof將更新的資料通知到aof是采用事件的方式注冊,等待下一次file的事件的回圈
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

上面內容我們有提到過在append aof buffer的同時,也會直接注冊rewrite事件到event loop 里面,然后負責傳送到子行程的方法,就是用的aofChildWriteDiffData,

/* Event handler used to send data to the child process doing the AOF
 * rewrite. We send pieces of our AOF differences buffer so that the final
 * write when the child finishes the rewrite will be small. */
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    //通過block這個結構來承載資料
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        //如果收到了停止信號則不再發送資料
        if (server.aof_stop_sending_diff || !block) {
            //delete 事件
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            //寫入資料到子行程管道里面
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

上面代碼就是向子行程發送diff資料的主邏輯,

父行程發送停止信號的邏輯

在初始化子行程的時候我們有注意到我們開啟了一個監聽讀的事件

    //注冊父行程讀子行程傳回來的ack
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

/* This event handler is called when the AOF rewriting child sends us a
 * single '!' char to signal we should stop sending buffer diffs. The
 * parent sends a '!' as well to acknowledge. */
//當注冊的管道有可讀事件回應的時候會觸發這個方法
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);
    //讀取資料
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        //寫入feedback的ack
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            /* If we can't send the ack, inform the user, but don't try again
             * since in the other side the children will use a timeout if the
             * kernel can't buffer our write, or, the children was
             * terminated. */
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    //洗掉掉該file event
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

當子行程回寫事件到這個管道的時候,就會觸發到aofChildPipeReadable,這樣父行程就會發送feedback到子行程,即使沒發送成功,子行程本身也有超時的操作,

父行程rewrite結束邏輯

父行程的結束邏輯同樣也是放在了serverCron這個方法里面,且位置也和rdb在同一個地方,

void checkChildrenDone(void) {
    int statloc;
    pid_t pid;

    /* If we have a diskless rdb child (note that we support only one concurrent
     * child), we want to avoid collecting it's exit status and acting on it
     * as long as we didn't finish to drain the pipe, since then we're at risk
     * of starting a new fork and a new pipe before we're done with the previous
     * one. */
    //檢查是否有rdb 在運行
    if (server.rdb_child_pid != -1 && server.rdb_pipe_conns)
        return;
    //檢查子行程是否已經完成
    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
        int exitcode = WEXITSTATUS(statloc);
        int bysignal = 0;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        /* sigKillChildHandler catches the signal and calls exit(), but we
         * must make sure not to flag lastbgsave_status, etc incorrectly.
         * We could directly terminate the child process via SIGUSR1
         * without handling it, but in this case Valgrind will log an
         * annoying error. */
        if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
            bysignal = SIGUSR1;
            exitcode = 1;
        }

        if (pid == -1) {
            serverLog(LL_WARNING,"wait3() returned an error: %s. "
                "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
                strerror(errno),
                (int) server.rdb_child_pid,
                (int) server.aof_child_pid,
                (int) server.module_child_pid);
        } else if (pid == server.rdb_child_pid) {
            backgroundSaveDoneHandler(exitcode,bysignal);
            if (!bysignal && exitcode == 0) receiveChildInfo();
        } 
		//aof rewrite的邏輯
		else if (pid == server.aof_child_pid) {
            backgroundRewriteDoneHandler(exitcode,bysignal);
            if (!bysignal && exitcode == 0) receiveChildInfo();
        } else if (pid == server.module_child_pid) {
            ModuleForkDoneHandler(exitcode,bysignal);
            if (!bysignal && exitcode == 0) receiveChildInfo();
        } else {
            if (!ldbRemoveChild(pid)) {
                serverLog(LL_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
        }
        updateDictResizePolicy();
        closeChildInfoPipe();
    }
}

父行程的結束邏輯

/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        int newfd, oldfd;
        char tmpfile[256];
        long long now = ustime();
        mstime_t latency;

        serverLog(LL_NOTICE,
            "Background AOF rewrite terminated with success");

        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        latencyStartMonitor(latency);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        //打開新檔案    
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            serverLog(LL_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }
        //將剩下沒有發送給子行程的資料寫入到新檔案
        if (aofRewriteBufferWrite(newfd) == -1) {
            serverLog(LL_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        //記錄延遲
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

        serverLog(LL_NOTICE,
            "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));

        /* The only remaining thing to do is to rename the temporary file to
         * the configured file and switch the file descriptor used to do AOF
         * writes. We don't want close(2) or rename(2) calls to block the
         * server on old file deletion.
         *
         * There are two possible scenarios:
         *
         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
         * file will be renamed to the configured file. When this file already
         * exists, it will be unlinked, which may block the server.
         *
         * 2) AOF is ENABLED and the rewritten AOF will immediately start
         * receiving writes. After the temporary file is renamed to the
         * configured file, the original AOF file descriptor will be closed.
         * Since this will be the last reference to that file, closing it
         * causes the underlying file to be unlinked, which may block the
         * server.
         *
         * To mitigate the blocking effect of the unlink operation (either
         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
         * use a background thread to take care of this. First, we
         * make scenario 1 identical to scenario 2 by opening the target file
         * when it exists. The unlink operation after the rename(2) will then
         * be executed upon calling close(2) for its descriptor. Everything to
         * guarantee atomicity for this switch has already happened by then, so
         * we don't care what the outcome or duration of that close operation
         * is, as long as the file descriptor is released again. */
        //如果aof是disabled,
        if (server.aof_fd == -1) {
            /* AOF disabled */

            /* Don't care if this fails: oldfd will be -1 and we handle that.
             * One notable case of -1 return is if the old file does
             * not exist. */
            //舊的檔案設定為只讀,不阻塞
            oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
        } else {
            /* AOF enabled */
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
        }

        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        latencyStartMonitor(latency);
        //如果rename失敗
        if (rename(tmpfile,server.aof_filename) == -1) {
            serverLog(LL_WARNING,
                "Error trying to rename the temporary AOF file %s into %s: %s",
                tmpfile,
                server.aof_filename,
                strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename",latency);

        if (server.aof_fd == -1) {
            //aof disable 就無需要關心aof的檔案
            /* AOF disabled, we don't need to set the AOF file descriptor
             * to this new file, so we can close it. */
            close(newfd);
        } else {
            /* AOF enabled, replace the old fd with the new one. */
            //設定aof的檔案描述符
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
            //如果是always 執行sync 邏輯
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
                redis_fsync(newfd);
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
                aof_background_fsync(newfd);
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
            //更新aof的size
            aofUpdateCurrentSize();
            //更新basesize fsync的邏輯
            server.aof_rewrite_base_size = server.aof_current_size;
            server.aof_fsync_offset = server.aof_current_size;

            /* Clear regular AOF buffer since its contents was just written to
             * the new AOF from the background rewrite buffer. */
            //清除aof_buf未處理完的資料
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
        }
        //設定status
        server.aof_lastbgrewrite_status = C_OK;

        serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
        /* Change state from WAIT_REWRITE to ON if needed */
        //aof state 狀態改變
        if (server.aof_state == AOF_WAIT_REWRITE)
            server.aof_state = AOF_ON;

        /* Asynchronously close the overwritten AOF. */
        //關閉舊的fd檔案通過backgroud
        if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

        serverLog(LL_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
    } else if (!bysignal && exitcode != 0) {
        server.aof_lastbgrewrite_status = C_ERR;

        serverLog(LL_WARNING,
            "Background AOF rewrite terminated with error");
    } else {
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error condition. */
        if (bysignal != SIGUSR1)
            server.aof_lastbgrewrite_status = C_ERR;

        serverLog(LL_WARNING,
            "Background AOF rewrite terminated by signal %d", bysignal);
    }

cleanup:
    //關閉通道
    aofClosePipes();
    //rewrite buffer 重置
    aofRewriteBufferReset();
    //刪掉臨時檔案
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
    //更新rewrite last
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;
}

以上的aof相關的所有流程總結,雖然涉及的代碼很多,但是邏輯相對來說還是比較清晰,

總結

redis相關的持久化流程我們已經講完了,下面的章節會開始探索redis 高可用和cluster的相關流程,有需要的同學歡迎收藏加點贊

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/238107.html

標籤:其他

上一篇:個人總結(二)

下一篇:C語言資料結構

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more