主頁 > 資料庫 > Redis持久化技術淺析

Redis持久化技術淺析

2022-07-24 08:51:40 資料庫

Redis是一種記憶體資料庫,資料都存盤在記憶體中,因此可以快速地直接基于記憶體中的資料結構進行高性能的操作,但是所有資料都在記憶體中,一旦服務器宕機,記憶體中的資料就會全部丟失,資料將無法恢復,因此Redis也有自己的持久化機制,但是要注意這個持久化和普通資料庫的持久化不同,持久化檔案必須全部讀取到記憶體才可以使用,而不是按需加載,同時后續會將最新的修改寫入到磁盤,

Redis持久化有兩種機制,分別是:AOF(Append Only File)和RDB(Redis Database),

1.持久化全域入口

以Redis 5.0的原始碼進行分析,入口在server.c代碼中,在main函式中會呼叫server初始化:

// https://github.com/redis/redis/blob/5.0/src/server.c

void initServer(void) {
    // ...
    server.hz = server.config_hz;
    // ...
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    // 添加定時任務事件回呼
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    
    // ...
}

int main(int argc, char **argv) {
    // ...
    server.supervised = redisIsSupervised(server.supervised_mode);
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();

    // Server初始化
    initServer();
    if (background || server.pidfile) createPidFile();
    redisSetProcTitle(argv[0]);
    redisAsciiArt();
    checkTcpBacklogSettings();
    
    // ...
    
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

首先在main函式中呼叫initServer()進行服務初始化,初始化內容包括:信號監聽、DB初始化、添加各類回呼任務等,看到其中添加了serverCron這個回呼函式,這里面就負責持久化相關的實作,具體的呼叫頻次是依賴于server.config_hz的配置,在redis.conf中有相關的配置:

# Redis calls an internal function to perform many background tasks, like
# closing connections of clients in timeout, purging expired keys that are
# never requested, and so forth.
#
# Not all tasks are performed with the same frequency, but Redis checks for
# tasks to perform according to the specified "hz" value.
#
# By default "hz" is set to 10. Raising the value will use more CPU when
# Redis is idle, but at the same time will make Redis more responsive when
# there are many keys expiring at the same time, and timeouts may be
# handled with more precision.
#
# The range is between 1 and 500, however a value over 100 is usually not
# a good idea. Most users should use the default of 10 and raise this up to
# 100 only in environments where very low latency is required.
hz 10

這個值默認是10,也就是說每秒會執行10次后臺任務,也就是每間隔100ms執行1次,如果提高這個值的設定會使空閑時CPU的占用更高,如果需要更低的延遲可以將引數適當調大,但是不要超過100,hz的范圍被限制在[1, 500]

具體的事件驅動是由專門的異步庫來封裝,上面呼叫到的aeCreateTimeEventasMain都在ae.c中進行了封裝:

// https://github.com/redis/redis/blob/5.0/src/ae.c

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

// ...
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            // 由server.c定義回呼, 用于前置準備檔案描述符
            eventLoop->beforesleep(eventLoop);
        // 事件處理
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
// ...

Redis在aeMain中不斷回圈進行事件處理,這里底層使用的異步庫分別為:evport、epoll、kqueue、select這幾個,其中evport屬于Solaris 10平臺,然后epoll屬于linux平臺,kqueue屬于BSD和OS X平臺,最后的選擇是select方式,其中evport/epoll/kqueue的復雜度都是O(1),select基于描述符掃描,復雜度是O(n),

然后再回到server.c中重點來看一下serverCron函式的邏輯:

// server.h 靜態定義
#define CONFIG_DEFAULT_DYNAMIC_HZ 1             /* Adapt hz to # of clients.*/
#define CONFIG_DEFAULT_HZ        10             /* Time interrupt calls/sec. */
#define CONFIG_MIN_HZ            1
#define CONFIG_MAX_HZ            500
#define MAX_CLIENTS_PER_CLOCK_TICK 200          /* HZ is adapted based on that. */


#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
// ...

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    server.hz = server.config_hz;
    /* Adapt the server.hz value to the number of configured clients. If we have
     * many clients, we want to call serverCron() with an higher frequency. */
    // 動態調整任務處理頻率
    if (server.dynamic_hz) {
        while (listLength(server.clients) / server.hz >
               MAX_CLIENTS_PER_CLOCK_TICK)
        {
            server.hz *= 2;
            if (server.hz > CONFIG_MAX_HZ) {
                server.hz = CONFIG_MAX_HZ;
                break;
            }
        }
    }
    
    // ...
    /* We need to do a few operations on clients asynchronously. */
    clientsCron();

    /* Handle background operations on Redis databases. */
    databasesCron();

    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        int statloc;
        pid_t pid;

        // WNOHANG 非阻塞  WUNTRACED 表示當行程收到SIGTTIN, SIGTTOU, SIGSSTP, SIGTSTOP時也會回傳
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == -1) {
                serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
            updateDictResizePolicy();
            closeChildInfoPipe();
        }
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now. */
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }

        /* Trigger an AOF rewrite if needed. */
        if (server.aof_state == AOF_ON &&
            server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }


    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }
    
    // ...
    server.cronloops++;
    return 1000/server.hz;
}

有幾個重點需要解釋一下:

  1. 最上面的動態調整任務頻率的邏輯是,如果組態檔中dynamic-hz配置項打開表示自動根據客戶端數量自動調整任務的頻率,這個配置項默認是開啟的,如果客戶端的數量除以當前的hz值大于MAX_CLIENTS_PER_CLOCK_TICK的值,會將實際的任務執行頻率變成之前的兩倍,直到不符合while條件或者大于CONFIG_MAX_HZ的值都會退出回圈,其中MAX_CLIENTS_PER_CLOCK_TICK的值是200,CONFIG_MAX_HZ的值是500,客戶端數量除以當前配置的頻率,表示每個客戶端需要多少次周期才可以有一次被處理的機會,如果客戶端數量太多導致平均大于200個周期才可以處理,會導致回應過慢所以這個時候將當前的處理頻率加倍,但是如果超過500又會導致CPU占用比較高,因此最高會將頻率調整為500,從而保證客戶端的回應的實時性,
  2. 如果此時由用戶請求重寫AOF檔案并且此時也沒有正在執行的AOF或RBD持久化行程在運行,則會啟動重寫任務,
  3. 然后就到了比較核心的持久化邏輯部分,如果此時正在有持久化任務在執行中或者存在腳本沒有執行完,那么則獲取子行程的狀態用于資源的回收,否則將判斷是否達到持久化的條件,從而后臺執行持久化的任務,

主要分析下持久化的判斷部分,呼叫wait3函式除了獲取子行程的狀態還可以獲得子行程的資源資訊,由rusage結構體指標帶出來,引數WNOHANG表示wait no hang,主行程不會阻塞等待子行程而是會馬上回傳,如果子行程都處于正常運行狀態,直接回傳0,上面的邏輯都會跳出,如果回傳的不是0說明子行程執行完了,子行程執行完之后如果主行程還存在并且沒有顯示呼叫wait相關的函式,那么子行程的狀態會變為defunct狀態而成為僵尸行程,Redis會在執行下一個周期任務時再次進來拿到行程狀態,如果回傳的pid和RDB或者AOF子行程的pid一致,則會執行相關的回收作業,也就是backgroundSaveDoneHandler或者backgroundRewriteDoneHandler操作,主要是做一些狀態的設定,最后會執行updateDictResizePolicy函式開啟rehash操作,

反過來如果此時沒有運行任何的持久化任務,就進入else分支,遍歷相應的配置引數,如果滿足key的修改個數和時間的限制則優先執行RDB持久化的任務,然后判斷如果開啟AOF并且此時沒有其他任務運行,且滿足當前的檔案大小大于最小的重寫大小閾值則出發AOF的重寫,最小的限制由組態檔中的auto-aof-rewrite-min-size配置,默認是64M,滿足條件也不一定進行重寫,而是將當前大小和基準大小進行比較,當比基準大小大1倍以上時才觸發重寫,基準大小在啟動Redis服務時被設定為AOF檔案的初始大小并且在每一次重寫完成后更新為重寫后的大小,具體可以參考aof.cbackgroundRewriteDoneHandlerloadAppendOnlyFile這兩個函式的原始碼,

最后上面的run_with_period函式表示多個周期執行1次的意思,具體可以參考server.h中定義的宏,如果設定的值小于周期,也就是每個周期都執行,否則會用設定時間除以周期時間,得到余數,余數是0時則執行一次,也就是指定周期個數執行一次,具體回圈通過cronloops變數來計數,

然后就要進入到具體的持久化邏輯中了,下面主要來分析一下RDB和AOF持久化的大致程序,

2.RDB持久化

RDB持久化是Redis首選的默認持久化方式,通常我們叫做記憶體快照,表示記憶體中的資料在某一個時刻的狀態記錄,執行RDB持久化就是將當前記憶體中的資料寫入到磁盤的程序,當Redis重新啟動時,會從快照中恢復資料,RDB是比較緊湊的存盤格式,寫入和恢復速度都比較快,但是每一次持久化都是全量的資料寫入,所以當資料規模越大的時候,寫入的RDB檔案也越大,磁盤寫入的開銷也會變大,所以要配置合適的引數在適當的時候執行持久化,避免頻繁的持久化操作,

另外由于Redis是單執行緒的,如果在主執行緒中執行持久化必然會帶來執行緒的阻塞,所以自動的持久化操作是采用fork一個子行程的方式來完成,這樣不會影響主行程的運行,另外Redis還提供兩個命令用于手動進行持久化,分別是savebgsave,其中save是同步方式執行,會阻塞其他所有的操作,所以幾乎不怎么使用,在某些極端情況下例如當Linux系統行程耗盡的時候為了保存資料可能會用到,通常bgsave命令用的會比較多一些,這個和后臺自動的RDB持久化操作是一樣的,只是以手動的方式觸發,

和RDB相關的持久化配置如下:

save 900 1
save 300 10
save 60 10000

stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./

其中save配置決定持久化的時機,例如默認情況下,900s變動的key超過1個則進行持久化,300s變動的key超過10個則進行持久化,60s變動的key超過10000個則進行持久化,多個持久化的條件是或的關系,只要1個條件觸發就會執行,如果想關閉RDB持久化則可以注釋掉所有的指令或者配置為空:

save ""

這樣也就關閉了RDB持久化,

RDB持久化的操作在函式rdbSaveBackground中,大致的原始碼如下:

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);
    openChildInfoPipe();

    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeClildUnusedResourceAfterFork();
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            closeChildInfoPipe();
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}


void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}

可以看到進入rdbSaveBackground函式后,首先執行fork呼叫開辟1個子行程用于執行持久化的操作,父行程主要是執行了updateDictResizePolicy將全域哈希表的rehash關閉,就直接回傳了,然后子行程會修改行程名為redis-rdb-bgsave然后進入rdbSave函式:

// server.h
#define REDIS_AUTOSYNC_BYTES (1024*1024*32) /* fdatasync every 32MB */

/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }

    rioInitWithFile(&rdb,fp);

    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }

    serverLog(LL_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

// https://github.com/redis/redis/blob/5.0/src/rio.c
void rioInitWithFile(rio *r, FILE *fp) {
    *r = rioFileIO;
    r->io.file.fp = fp;
    r->io.file.buffered = 0;
    r->io.file.autosync = 0;
}

// 設定自動提交
void rioSetAutoSync(rio *r, off_t bytes) {
    serverAssert(r->read == rioFileIO.read);
    r->io.file.autosync = bytes;
}

/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;

    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len;

    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
    {
        fflush(r->io.file.fp);
        redis_fsync(fileno(r->io.file.fp));
        r->io.file.buffered = 0;
    }
    return retval;
}

首先Redis子行程創建了一個臨時檔案,名為:temp-<pid>.rdb,然后呼叫rioInitWithFile初始化了rio,這個是Redis自己封裝的IO庫,然后如果在redis.conf中開啟了rdb-save-incremental-fsync配置則會啟動自動刷盤,默認這個引數是開啟的,每次寫入位元組的大小由宏REDIS_AUTOSYNC_BYTES定義,為32M,在rioFileWrite可以看到當寫入位元組數大于autosync的值時,會執行flush操作將資料寫入到磁盤,默認檔案是先寫入作業系統快取中,刷盤時機不確定,開啟自動重繪后一方面可以提高資料的可靠性,另一方面也可以避免最終刷盤帶來的性能開銷,

然后會呼叫rdbSaveRio執行具體的資料持久化操作,最終執行完畢后會將臨時檔案重命名為dump.rdb,這里rename呼叫是原子性的,另外Redis執行例外處理的技巧是將資源關閉操作定義為一個label也就是werr,然后在遇到錯誤時通過goto統一跳轉執行,這是在C中例外處理的常用模式,

然后大致看一下rdbSaveRio執行的操作:

// rdb.h
#define RDB_VERSION 9
#define RDB_SAVE_NONE 0

int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);

        /* Write the SELECT DB opcode */
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
         * is currently the largest type we are able to represent in RDB sizes.
         * However this does not limit the actual size of the DB to load since
         * these sizes are just hints to resize the hash tables. */
        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

            /* When this RDB is produced as part of an AOF rewrite, move
             * accumulated diff from parent to child while rewriting in
             * order to have a smaller final write. */
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    /* If we are storing the replication information on disk, persist
     * the script cache as well: on successful PSYNC after a restart, we need
     * to be able to process any EVALSHA inside the replication backlog the
     * master will send us. */
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

    /* EOF opcode */
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
    int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
    int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;

    /* Add a few fields about the state when the RDB was created. */
    if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;

    /* Handle saving options that generate aux fields. */
    if (rsi) {
        if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
            == -1) return -1;
        if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
            == -1) return -1;
        if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
            == -1) return -1;
    }
    if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
    return 1;
}

static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
    if (rdb && rioWrite(rdb,p,len) == 0)
        return -1;
    return len;
}


首先頭部寫入了REDIS+<RDB_VERSION>的字串,當前RDB_VERSION的值為9,也就是寫入REDIS0009,然后會執行rdbSaveInfoAuxFields寫入一些輔助資訊,例如Redis的版本、位數、當前時間、記憶體占用等資訊,如果查看rdb檔案也可以看到頭部的一些資訊:

head -c 128 dump.rdb

image-20220721104615473

然后往下會遍歷每一個庫,獲取里面全域哈希表的Iterator,回圈迭代,最后呼叫rdbSaveKeyValuePair將Key和Value保存到檔案中,執行具體的保存是在rdbSaveObject函式中,這里面做了所有型別的判斷并將其轉為位元組陣列寫入,

在函式最后,會寫入checksum到檔案尾部,這樣整個寫入就執行完畢并回傳等待主行程的回收,

上面是RDB的大致程序,然后來總結一下:

redis-rdb

RDB持久化程序中主要的問題就是在生成快照的程序中,資料將如何進行修改的問題,快照其實就是在這一時刻的狀態,所以我們不希望快照期間快照本身的資料有變化,但是Redis主執行緒同時又能夠接受請求正常更新資料,更新的內容對快照子行程來說應該是不可見的,因為不希望對快照的狀態產生影響,而恰好作業系統解決了這些問題,首先bgsave行程是由主執行緒fork出來的,因此bgsave行程共享主執行緒的頁表,這點是作業系統為了提升fork的性能所做的優化,這樣不需要進行全量的記憶體拷貝,如果主執行緒此時來了讀操作,那么直接讀就可以了,和子行程沒有任何影響,但是如果主執行緒接收到了寫操作,那么要修改的這塊資料會在主執行緒中復制一份生成原來資料的副本,主執行緒會自動將映射指向這塊副本空間,然后執行寫操作,這時候子行程bgsave仍然是讀取到的原來的記憶體空間,所以保存快照的程序是不受影響的,這就是寫時復制(Copy-on-write)技術,在執行快照的同時,正常處理寫操作,當子行程運行完畢后,沒有參考的這部分記憶體會被釋放掉,

由于快照期間會發生資料的修改,如果兩次快照之間資料發生了變化,第二次快斬訓沒有執行服務器就掛掉了,那么這個時候資料仍然會出現丟失的情況,但是如果頻繁的執行全量快照,會給磁盤帶來巨大的壓力,在極端情況下如果持久化程序中執行頻繁的寫入那么主執行緒和子行程的記憶體可能完全不一樣了,記憶體最高占用可以到原來的2倍,在生產環境配置時要調整好自動快照的頻率,在性能和可靠性上做一個平衡,Redis也考慮到的頻繁執行全量快照的情況,所以在代碼中限制在bgsave子行程執行的程序中是無法啟動第二個bgsave行程的,

2.AOF

AOF持久化其實是一種類似日志的形式,會將所有執行過的命令寫入到日志中,在恢復時讀取命令重新執行一遍就完成恢復了,這個和通常的WAL(Write Ahead Log)類似,也叫預寫日志,也就是說在實際寫入資料前,先把資料記錄到日志中,以便在故障時可以自動恢復,從而保證寫入的事務性,但是AOF不同的地方在于寫的時機正好反過來,可以稱之為“寫后”日志,也就是Redis先執行命令,在記憶體中完成資料結構的操作,然后再將命令寫入日志,那么Redis為什么要這么做呢?因為考慮到決議陳述句會帶來額外的開銷,所以Redis寫入aof檔案時并不會對命令做正確性檢查,所以如果先寫aof檔案可能會寫入一條錯誤的命令,而先執行再寫,在執行階段出現問題的命令肯定就是不合法的命令,也就不會被記錄到aof檔案中,這樣就避免命令決議校驗所帶來的開銷,主要就是避免記錄錯誤指令,另外就是在命令執行之后寫入日志,不會阻塞當前客戶端的執行,也就是說客戶端不需要等待寫操作完成才繼續往下執行,只需要等待記憶體操作完之后客戶端就可以直接向下執行,所以也可以提高性能,

寫入的格式大致如下:

image-20220721172605024

*3表示3個運算子,$3表示下一個指令或者引數的長度,這樣依次類推,

AOF也存在一些問題,最明顯的就是資料丟失,例如剛執行完一個命令,還沒有來得及記錄日志服務器就宕機了,這時候這個已經對資料執行的修改也就丟失了,因此會帶來資料不一致的風險,如果是用作快取沒什么大礙,如果當做資料庫是達不到標準的,另外雖然上面提到過aof后寫的方式不會阻塞客戶端的執行,但是假如客戶端操作比較頻繁或者并發比較高,可能會出現下面的情況:

客戶端執行 -> 記憶體操作完畢 -> 寫AOF(阻塞其他客戶端的操作) -> 客戶端執行 -> ...

由于Redis是單執行緒的方式,雖然第一個客戶端執行沒有影響,但是如果第二次執行間隔時間很短或者其他客戶端執行時,由于要寫AOF,因此后續的客戶端仍然會等待,正常日志寫的比較快沒什么問題,如果當磁盤壓力非常大的時候,寫盤很慢的話,那么客戶端的操作也會變卡,

默認情況下AOF持久化的配置如下:

appendonly no
appendfilename "appendonly.aof"

# appendfsync always
appendfsync everysec
# appendfsync no

# rewrite時不執行fsync操作
no-appendfsync-on-rewrite no

# rewrite條件
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

aof-load-truncated yes

# 是否開啟混合持久化
aof-use-rdb-preamble yes

默認情況下appendonly選項是關閉的,當打開時才會執行AOF持久化,持久化的檔案名稱由appendfilename配置,

對于寫回的策略由引數appendfsync配置,含義分別如下:

  1. always:同步寫回,也就是每個寫命令執行完成,立刻同步將日志寫入aof檔案,

  2. everysec:每秒寫回,當命令執行完成后,先將日志寫入aof檔案的記憶體緩沖區,然后每隔1s把緩沖區內容寫入磁盤,

  3. no:Redis不進行主動寫回,而是由作業系統控制寫回,每次僅把日志寫入aof檔案的記憶體緩沖區,由作業系統決定何時將緩沖區內容寫回磁盤,

這3種策略各有優缺點,像always基本可以做到不丟資料,但是每次執行命令都需要落盤操作,所以會影響主執行緒后續命令的性能,最慢也最安全,而no這種方式性能最高,每次只需要寫緩沖區,但是落盤不受控制,完全由作業系統來負責寫入,如果宕機可能會丟失比較多的資料,最后默認的策略是everysec,這是另外兩種策略的折中,也就是在速度和安全性方面的折中,每秒刷盤從一定程度上提高了性能,在宕機時丟失的資料也控制在1s的區間內,是Redis的默認選項,這3種模式匯總比較如下:

appendfsync 含義 優點 缺點
always 同步寫回 可靠性最高,資料幾乎不丟 性能低,開銷大
everysec 每秒寫回 性能較高、可靠性適中 資料丟失在1s以內
no 不主動刷盤 高性能 可靠性低

具體寫回策略要根據實際的場景配置,如果不確定,保持默認值,

然后auto-aof-rewrite-percentageauto-aof-rewrite-min-size這兩個引數是表示AOF重寫的條件,為什么要進行AOF重寫,原因主要如下:

  1. 單檔案大小過大,由于aof是單個檔案,如果Redis不斷執行命令,那么很容易就達到數十億甚至上百億的命令數,因此檔案會非常大,效率也會逐漸降低,
  2. 如果Redis服務重啟,那么所有的命令都要依次被重新執行,如果檔案太大,要執行的命令也特別多,恢復就會非常緩慢,

針對上面的問題尤其是第2個問題,可以看出aof檔案并不能一直無限制的增大,因此需要AOF重寫機制,重寫機制其實也很好理解,就是Redis根據當前實際存在的資料重新創建新的檔案來覆寫原來的檔案,比如原來的操作是這樣的:

set hello 1
set hello 2
incr hello

hset program java spring
hset program python flask
hset program golang gin
hset program python tornado
hdel program java

其中有些key是做了多次操作的,當前記憶體中的資料應該是下面這樣:

{
    "hello": "3",
    "program": {
        "python": "tornado",
        "golang": "gin"
    }
}

根據最后的狀態生成一遍寫入命令即可:

set hello 3
hmset program python tornado golang gin

所以上面的7條程序指令就被壓縮為這2條了,按照最終狀態重寫可以丟掉中間不必要的重復程序,這樣會大大減小檔案的體積,

上面兩個重寫條件引數的含義如下:

auto-aof-rewrite-percentage: 默認為100,表示當AOF日志大小增長至指定百分比時觸發重寫,通過上面的代碼可以看出來Redis會以上次重寫后的AOF檔案大小作為基準大小,如果初次啟動則以當前大小作為基準大小,然后拿當前大小和基準大小做比較,當當前大小超出基準大小指定的的百分比后,重寫會被觸發,例如當前配置為100,AOF檔案初始大小為300M,當檔案大小大于:300 + 300 * 100% = 600M時則觸發重寫,如果我們將該項配置為0則表示禁用重寫,

auto-aof-rewrite-min-size: 指定AOF檔案重寫要達到的最小位元組數,這樣可以避免過早地重寫,比如剛開始大小可能為10K,那么按照比例在20K時就會進行重寫,這樣太頻繁,因此指定最小大小后,即使百分比達到,也不進行重寫,需要超過這個指定檔案的大小且滿足百分比時才進行重寫,默認這個最小大小是64M,所以是當AOF檔案達到64M且超過基準大小的100%則觸發重寫操作,

然后aof-load-truncated配置項表示當檔案被截斷時讀取到EOF后會做什么操作,可能是由于檔案系統或者其他原因導致檔案沒有讀取完就結束,默認是允許這種情況并且正常啟動服務的僅會在日志中給出提示,如果想讓Redis此時停下來可以配置為no,并且可以使用redis-check-aof工具嘗試修復,最后需要注意的就是允許截斷并不表示允許檔案損壞,如果檔案出現損壞,無論這個配置是開啟還是關閉Redis都會報錯退出,

aof-use-rdb-preamble表示是否開啟混合持久化,默認是開啟的,Redis從4.0版本開始就開始支持混合使用AOF和RDB的持久化方法,只是默認是關閉的,從5.0開始就默認將混合持久化打開了,也就是說我們在配置AOF持久化的時候,其實是一種混合的方式,這種混合的方式其實是在AOF的基礎上實作的,首先是按照AOF的方式追加命令,當AOF檔案滿足一定的條件時會觸發重寫,而這個重寫的時機恰好會執行混合持久化的操作,在重寫的時候將內容以RDB的格式保存,但是仍然寫入AOF檔案,當重寫完成之后,隨后的寫操作仍然按照指令文本的方式追加,到下一次重寫時仍然轉換為RDB重寫到檔案頭部,如此往復,這樣在兩次快照之間通過比較輕量的AOF持久化來實時保存資料,在重寫時壓縮為快照以節省大量的空間,寫入速度比較快,同時恢復時也可以提升性能:

image-20220722084747285

AOF寫入要涉及到兩個部分,分別是實時寫入和AOF重寫,

2.1.AOF實時寫入

當開啟AOF時,Redis在實時處理請求時會先將內容寫入一個緩沖區,這個緩沖區在server.h中的redisServer結構體中進行了定義:

struct redisServer {
    // ...
    /* AOF persistence */
    int aof_state;                  /* AOF_(ON|OFF|WAIT_REWRITE) */
    int aof_fsync;                  /* Kind of fsync() policy */
    char *aof_filename;             /* Name of the AOF file */
    int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */
    int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
    off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */
    off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */
    off_t aof_current_size;         /* AOF current size. */
    off_t aof_fsync_offset;         /* AOF offset which is already synced to disk. */
    int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
    pid_t aof_child_pid;            /* PID if rewriting process */
    list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */
    sds aof_buf;      /* AOF buffer, written before entering the event loop */
    int aof_fd;       /* File descriptor of currently selected AOF file */
    int aof_selected_db; /* Currently selected DB in AOF */
    time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
    time_t aof_last_fsync;            /* UNIX time of last fsync() */
    time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */
    time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */
    int aof_lastbgrewrite_status;   /* C_OK or C_ERR */
    unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */
    int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */
    int rdb_save_incremental_fsync;   /* fsync incrementally while rdb saving? */
    int aof_last_write_status;      /* C_OK or C_ERR */
    int aof_last_write_errno;       /* Valid if aof_last_write_status is ERR */
    int aof_load_truncated;         /* Don't stop on unexpected AOF EOF. */
    int aof_use_rdb_preamble;       /* Use RDB preamble on AOF rewrites. */
    /* AOF pipes used to communicate between parent and child during rewrite. */
    int aof_pipe_write_data_to_child;
    int aof_pipe_read_data_from_parent;
    int aof_pipe_write_ack_to_parent;
    int aof_pipe_read_ack_from_child;
    int aof_pipe_write_ack_to_child;
    int aof_pipe_read_ack_from_parent;
    int aof_stop_sending_diff;     /* If true stop sending accumulated diffs
                                      to child process. */
    sds aof_child_diff;             /* AOF diff accumulator child side. */
    // ...
}

這其中定義了AOF相關的所有變數用于資料及狀態的保存,aof_buf就是寫入的緩沖區,型別是sds簡單動態字串,所有客戶端指令的執行是通過void call(client *c, int flags)這個函式來執行:

void call(client *c, int flags) {
    // ...
    /* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;

        /* Check if the command operated changes in the data set. If so
         * set for replication / AOF propagation. */
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        /* If the client forced AOF / replication of the command, set
         * the flags regardless of the command effects on the data set. */
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

        /* However prevent AOF / replication propagation if the command
         * implementations called preventCommandPropagation() or similar,
         * or if we don't have the call() flags to do so. */
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;

        /* Call propagate() only if at least one of AOF / replication
         * propagation is needed. Note that modules commands handle replication
         * in an explicit way, so we never replicate them automatically. */
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
    // ...
}

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

可以看到在call函式中呼叫了propagate函式,里面會呼叫feedAppendOnlyFile寫入AOF緩沖區:

// aof.c
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appended. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
        if (pxarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == AOF_ON)
        // 寫入AOF buffer
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    if (server.aof_child_pid != -1)
        // 寫入AOF重寫子行程的buffer
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
        decrRefCount(o);
    }
    return dst;
}

sds sdscatlen(sds s, const void *t, size_t len) {
    size_t curlen = sdslen(s);

    // sds空間擴容
    s = sdsMakeRoomFor(s,len);
    if (s == NULL) return NULL;
    memcpy(s+curlen, t, len);
    sdssetlen(s, curlen+len);
    s[curlen+len] = '\0';
    return s;
}


feedAppendOnlyFile函式中通過catAppendOnlyGenericCommand生成命令對應的寫入文本,然后呼叫sdscatlen和原來的aof_buf進行拼接,完成了向aof_buf的寫入操作,如果此時,執行AOF重寫的子行程正在運行,那么還會向子行程的緩沖區寫入變化的內容,子行程會一并執行重寫,呼叫的函式是aofRewriteBufferAppend,這個等下再說,然后看一下回寫策略的執行部分,入口仍然是在serverCron大回圈中:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    // ...
}

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) {
        /* Check if we need to do fsync even the aof buffer is empty,
         * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
         * called only when aof buffer is not empty, so if users
         * stop write commands before fsync called in one second,
         * the data in page cache cannot be flushed in time. */
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
            server.aof_fsync_offset != server.aof_current_size &&
            server.unixtime > server.aof_last_fsync &&
            !(sync_in_progress = aofFsyncInProgress())) {
            goto try_fsync;
        } else {
            return;
        }
    }

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = aofFsyncInProgress();

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */

    latencyStartMonitor(latency);
    // 將AOF buffer寫入內核快取
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    
    // ...

    /* We performed the write so reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

    
    
    // ...
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

try_fsync:
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* redis_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_fsync_offset = server.aof_current_size;
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) {
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        server.aof_last_fsync = server.unixtime;
    }
}

ssize_t aofWrite(int fd, const char *buf, size_t len) {
    ssize_t nwritten = 0, totwritten = 0;

    while(len) {
        nwritten = write(fd, buf, len);

        if (nwritten < 0) {
            if (errno == EINTR) {
                continue;
            }
            return totwritten ? totwritten : -1;
        }

        len -= nwritten;
        buf += nwritten;
        totwritten += nwritten;
    }

    return totwritten;
}

void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

int aofFsyncInProgress(void) {
    return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}

Redis服務初始化的時候會呼叫一次flushAppendOnlyFile(0),將變數server.aof_flush_postponed_start初始化為當前的Unix時間戳,然后后續在主行程的回圈中不斷判斷是否滿足寫入的要求,無論是否滿足首先會先呼叫aofWrite函式將server.aof_buf寫入內核緩沖區,然后清空aof_buf,到下一次回圈的時候如果看到aof_buf被清空的時候會gototry_fsync標簽部分,如果寫回策略配置的是always則直接呼叫redis_fsync寫入,否則如果配置的是everysec那么會呼叫aof_background_fsync放到后臺執行緒執行,其實是呼叫bioCreateBackgroundJob將任務添加到佇列,這里要涉及到bio操作,bio是采用多執行緒來實作的,Redis所有的事件、記憶體資料結構操作都是在主執行緒中處理,而檔案句柄的關閉、AOF刷盤這些系統呼叫都是采用bio進行專門的管理,在Redis中一共開了3個執行緒來做這些事:

// bio.h
/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3

// server.c
void InitServerLast() {
    bioInit();
    server.initial_memory_usage = zmalloc_used_memory();
}
// bio.c
static pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
static list *bio_jobs[BIO_NUM_OPS];
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            // 條件變數等待
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

/* Return the number of pending jobs of the specified type. */
unsigned long long bioPendingJobsOfType(int type) {
    unsigned long long val;
    pthread_mutex_lock(&bio_mutex[type]);
    val = bio_pending[type];
    pthread_mutex_unlock(&bio_mutex[type]);
    return val;
}

首先,在Redis主行程啟動的時候在InitServerLast中呼叫了bioInit初始化了所有的執行緒并且啟動,bioProcessBackgroundJobs就在后臺開始運行了,然后會進入無限回圈并通過條件變數等待,這個時候當通過aof_background_fsync創建任務時就會呼叫bioCreateBackgroundJob在任務串列中添加1個節點,并使用pthread_cond_signal喚醒等待的執行緒,這樣就在bioProcessBackgroundJobs執行緒中執行具體的寫入任務,bio部分體現了在并發中鎖和條件變數的經典用法,

以上就是AOF實時緩沖區的寫入程序,然后簡單看一下重寫的程序,

2.2.AOF重寫

AOF重寫是在serverCron當中判斷滿足重寫的條件時執行的操作,具體是呼叫aof.c中的rewriteAppendOnlyFileBackground函式執行:

// aof.c
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    // 創建父子行程通信的管道
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    start = ustime();
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        /* Child */
        closeClildUnusedResourceAfterFork();
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            closeChildInfoPipe();
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}

int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    // 添加回呼函式 呼叫aofChildPipeReadable釋放資源
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}

重點要注意的是進入函式后首先使用aofCreatePipes創建了用于之后和子行程通信的管道,管道是行程間通信的一種方式:

img

在函式內部通過注冊事件最侄訓呼叫aofChildPipeReadable函式用于關閉資源,然后父行程在輪詢時讀取到對應的結果就可以執行不同的操作,

創建管道后就開始執行fork創建執行寫入任務的子行程,首先和RDB類似也會創建一個臨時檔案temp-rewriteaof-bg-<pid>.aof,執行重寫操作,完成之后再重命名覆寫之前的檔案,子行程創建成功后,父行程做了些簡單的狀態設定就回傳了,但是父行程有個比較重要的操作就是不斷將新的客戶端操作發送到管道中,前面提到過在實時寫入的情況下還會有一步額外的操作:

    if (server.aof_child_pid != -1)
        // 寫入AOF重寫子行程的buffer
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

這里呼叫aofRewriteBufferAppend來寫入新提交的資料:

#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */

void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    // 獲取快取鏈表最后1個節點
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }

    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}


void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            // 資料已經寫完 洗掉注冊事件
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            // 將寫入到管道的資料空間釋放掉
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        // 如果block完全寫入直接洗掉當前節點
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

注意這里仍然會先寫入父行程的用戶空間緩沖區中,緩沖區采用創建鏈表并插入節點的方式來快取父行程新寫入的資料,首先會判斷鏈表剩余空間夠不夠如果不夠會再新創建1個節點把剩余的資料寫入,鏈表每個節點的大小是10M,最后如果沒有向子行程管道寫入的事件,那么會注冊1個用來寫入的事件,回呼函式是aofChildWriteDiffData,這個函式才是具體執行向子行程管道寫入的作業,子行程可以共享管道中的資料,

然后回來看子行程,子行程部分開始呼叫rewriteAppendOnlyFile函式執行寫入操作:

int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp;
    char tmpfile[256];
    char byte;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);

    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);

    if (server.aof_use_rdb_preamble) {
        int error;
        if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }

    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    /* Read again a few times to get more data from the parent.
     * We can't read forever (the server may receive data from clients
     * faster than it is able to send data to the child), so we try to read
     * some more data in a loop as soon as there is a good chance more data
     * will come. If it looks like we are wasting time, we abort (this
     * happens after 20 ms without new data). */
    int nodata = https://www.cnblogs.com/freeweb/p/0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }

    /* Ask the master to stop sending diffs. */
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

子行程這個時候又創建了1個臨時檔案temp-rewriteaof-%d.aof,然后和之前寫RDB一樣初始化rio和自動刷盤,如果此時開啟了混合持久化即aof-use-rdb-preamble則呼叫rdbSaveRio按照RDB的格式寫入,否則將呼叫rewriteAppendOnlyFileRio按照AOF的格式寫入,RDB寫入和之前基本一樣,但是AOF的RDB頭部寫入部分加了引數RDB_SAVE_AOF_PREAMBLE作為區分,所以不屬于嚴格的快照,而是一個增量的程序,重點部分的代碼如下:

// server.h
#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)

// rdb.c
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    // ...
    for (j = 0; j < server.dbnum; j++) {
        // ...
        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

            /* When this RDB is produced as part of an AOF rewrite, move
             * accumulated diff from parent to child while rewriting in
             * order to have a smaller final write. */
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        // ...
    }
    
    // ...
}


// aof.c
ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

當傳入RDB_SAVE_AOF_PREAMBLE的flags的時候,并且當處理寫入的位元組數每增長AOF_READ_DIFF_INTERVAL_BYTES大小時,就會從管道中讀取一次父行程的寫入并放到子行程的變化緩沖區aof_child_diff中,AOF_READ_DIFF_INTERVAL_BYTES定義的大小是10k,也就是說每寫入10k的原有資料,就會從管道中讀取一次父行程新增的資料,從而緩解后續寫入的壓力,

同樣如果沒有開啟混合持久化,則會呼叫rewriteAppendOnlyFileRio重寫AOF檔案:

int rewriteAppendOnlyFileRio(rio *aof) {
    // 省略...

    for (j = 0; j < server.dbnum; j++) {
        // ...
        di = dictGetSafeIterator(d);
        
        // ...

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* Save the key and associated value */
            if (o->type == OBJ_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(aof,o) == 0) goto werr;
            } else if (o->type == OBJ_LIST) {
                if (rewriteListObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_SET) {
                if (rewriteSetObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_ZSET) {
                if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_HASH) {
                if (rewriteHashObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_STREAM) {
                if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_MODULE) {
                if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
            } else {
                serverPanic("Unknown object type");
            }
            
            // ...
            /* Read some diff from the parent process from time to time. */
            if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
                // 從管道中讀取新增的資料合并至aof_child_diff緩沖區
                processed = aof->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    return C_OK;

werr:
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

可以看到除寫入的格式不同外,仍然有一段和RDB中非常類似的代碼從管道中獲取最新的修改,

當資料最終寫完之后,回傳到rewriteAppendOnlyFile中繼續執行,然后繼續嘗試20ms的時間從主行程獲取最新的修改,然后子行程主動向主行程寫入!表示關閉管道:

if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;

而主行程中注冊的事件處理會收到子行程的寫入:

void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);

    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            /* If we can't send the ack, inform the user, but don't try again
             * since in the other side the children will use a timeout if the
             * kernel can't buffer our write, or, the children was
             * terminated. */
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

這個時候主行程就會收到子行程發送的信號,然后洗掉事件注冊,

子行程繼續向下執行,呼叫rioWrite寫入aof_child_diff緩沖區的內容到檔案中,最后重繪緩沖并將檔案命名回去,即將temp-rewriteaof-%d.aof重命名為temp-rewriteaof-bg-<pid>.aof檔案:

if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
    goto werr;

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

/* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
    serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
    unlink(tmpfile);
    return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
return C_OK;

此時子行程執行完畢退出會變為僵尸行程,然后主行程在下一個心跳周期會再次進入serverCron執行回收的操作,代碼又回到了開始的位置:

if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
    int exitcode = WEXITSTATUS(statloc);
    int bysignal = 0;

    if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

    if (pid == -1) {
        serverLog(LL_WARNING,"wait3() returned an error: %s. "
                  "rdb_child_pid = %d, aof_child_pid = %d",
                  strerror(errno),
                  (int) server.rdb_child_pid,
                  (int) server.aof_child_pid);
    } else if (pid == server.rdb_child_pid) {
        backgroundSaveDoneHandler(exitcode,bysignal);
        if (!bysignal && exitcode == 0) receiveChildInfo();
    } else if (pid == server.aof_child_pid) {
        backgroundRewriteDoneHandler(exitcode,bysignal);
        if (!bysignal && exitcode == 0) receiveChildInfo();
    } else {
        if (!ldbRemoveChild(pid)) {
            serverLog(LL_WARNING,
                      "Warning, detected child with unmatched pid: %ld",
                      (long)pid);
        }
    }
    updateDictResizePolicy();
    closeChildInfoPipe();
}

然后會呼叫backgroundRewriteDoneHandler回收子行程的資源:

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        int newfd, oldfd;
        char tmpfile[256];
        long long now = ustime();
        mstime_t latency;

        serverLog(LL_NOTICE,
            "Background AOF rewrite terminated with success");

        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        latencyStartMonitor(latency);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            serverLog(LL_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }

        if (aofRewriteBufferWrite(newfd) == -1) {
            serverLog(LL_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

        serverLog(LL_NOTICE,
            "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));

        
        // ...
        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        latencyStartMonitor(latency);
        // 重命名操作
        if (rename(tmpfile,server.aof_filename) == -1) {
            serverLog(LL_WARNING,
                "Error trying to rename the temporary AOF file %s into %s: %s",
                tmpfile,
                server.aof_filename,
                strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename",latency);

        

        // ...
    } else if (!bysignal && exitcode != 0) {
        server.aof_lastbgrewrite_status = C_ERR;

        serverLog(LL_WARNING,
            "Background AOF rewrite terminated with error");
    } else {
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error condition. */
        if (bysignal != SIGUSR1)
            server.aof_lastbgrewrite_status = C_ERR;

        serverLog(LL_WARNING,
            "Background AOF rewrite terminated by signal %d", bysignal);
    }

cleanup:
    aofClosePipes();
    aofRewriteBufferReset();
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;
}

void aofClosePipes(void) {
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
    aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
    close(server.aof_pipe_write_data_to_child);
    close(server.aof_pipe_read_data_from_parent);
    close(server.aof_pipe_write_ack_to_parent);
    close(server.aof_pipe_read_ack_from_child);
    close(server.aof_pipe_write_ack_to_child);
    close(server.aof_pipe_read_ack_from_parent);
}

首先會再次打開子行程生成的AOF檔案,然后執行aofRewriteBufferWriteserver.aof_rewrite_buf_blocks中剩余的資料追加到檔案中,然后把AOF檔案rename為正式的檔案,然后使用bio后臺執行緒來關閉所有的資源,包括:管道、檔案句柄和緩沖區等,這樣就完成了整個AOF的重寫程序,

總結一下整體的流程圖如下所示:

image-20220723145332083

以上就是Redis的RDB和AOF這兩種持久化的原理分析,感謝您的閱讀,如有錯誤還望指正~

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/500137.html

標籤:大數據

上一篇:02-DSL操作Elasticsearch入門

下一篇:美團大腦百億級知識圖譜的構建及應用進展

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more