文章目錄
- 1. 過期時間的存盤
- 原始碼分析
- 2. 資料的淘汰
- 2.1 主動洗掉
- 2.1.1 命令執行前觸發
- 2.1.2 命令執行時觸發
- 2.2 定期洗掉
1. 過期時間的存盤
在 redisDb 資料結構 一節中已經提到過,redis 資料庫中有一個專門的 expires 字典用于存盤顯式設定了過期時間的資料(如 SETEX 命令設定的資料),本節以 SETEX 命令為例,依據原始碼分析過期時間的設定程序
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
原始碼分析
-
SETEX命令的處理函式為t_string.c#setexCommand(),可以看到該函式只是個入口,其本身并沒有多少邏輯void setexCommand(client *c) { c->argv[3] = tryObjectEncoding(c->argv[3]); setGenericCommand(c,OBJ_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL); } -
t_string.c#setGenericCommand()函式在之前的文章中也提到過,此次重點關注和過期時間相關部分,可以看到關鍵的流程如下:根據
expire引數,函式中有不同的處理,如果expire大于 0,說明客戶端傳輸過來的命令顯式設定了過期時間,則首先要對其進行轉化校驗,之后呼叫db.c#setExpire()函式將 key 和 過期時間保存到 redis 資料庫的過期字典中void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } ...... if (expire) setExpire(c,c->db,key,mstime()+milliseconds); ...... -
db.c#setExpire()函式邏輯非常簡練,可以分為以下幾步:- 首先呼叫
dictFind()函式從資料庫保存普通資料的字典(db->dict)中找到 key 所在的節點,據此判斷 key 是否存在 - key 存在的話,呼叫
dictAddOrFind()函式使用這個 key 在資料庫的過期字典(db->expires)中插入一個新的節點或者找到已經存在的節點,然后呼叫dictSetSignedIntegerVal()函式將過期時間設定為這個節點的 value - 最后如果當前服務端是可寫的從節點,則需要將過期資料專門記錄下來,呼叫
rememberSlaveKeyWithExpire()函式實作
/* Set an expire to the specified key. If the expire is set in the context * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de; /* Reuse the sds from the main dict in the expire dict */ kde = dictFind(db->dict,key->ptr); serverAssertWithInfo(NULL,key,kde != NULL); de = dictAddOrFind(db->expires,dictGetKey(kde)); dictSetSignedIntegerVal(de,when); int writable_slave = server.masterhost && server.repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } - 首先呼叫
2. 資料的淘汰
在 Redis 記憶體淘汰策略一節中,我們提到了 redis 共有 6 種資料淘汰的策略,本節將介紹 redis 是如何執行這些策略的,不過在此之前,我們首先要知道 redis 的過期資料洗掉其實有兩種觸發方式:
- 主動洗掉
發生在 redis 處理讀寫請求的程序,例如執行 get/set 等命令- 定期洗掉
發生在 redis 定時任務執行程序
2.1 主動洗掉
主動洗掉資料的動作其實也會多處觸發,首先服務端決議完客戶端傳輸過來的命令,準備執行前會檢查 redis 占用記憶體是否超過了配置值,從而判斷是否需要釋放空間,另外在命令執行的程序中也會檢查 key 是否過期了,對過期的 key 需要洗掉處理
2.1.1 命令執行前觸發
-
這部分的處理在
server.c#processCommand()函式中,不了解命令處理流程的讀者可參考Redis 6.0 原始碼閱讀筆記(1)-Redis 服務端啟動及命令執行,以下原始碼省略了與資料淘汰無關的部分,可以看到其主要邏輯如下:- 如果 redis 配置中設定了最大記憶體,并且 lua 腳本沒有超時,則需要進行下一步處理
- 呼叫
freeMemoryIfNeededAndSafe()函式進行資料淘汰處理
int processCommand(client *c) { ...... /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout * condition, to avoid mixing the propagation of scripts with the * propagation of DELs due to eviction. */ if (server.maxmemory && !server.lua_timedout) { int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions or the client * is in MULTI/EXEC context? Error. */ if (out_of_memory && (is_denyoom_command || (c->flags & CLIENT_MULTI && c->cmd->proc != discardCommand))) { rejectCommand(c, shared.oomerr); return C_OK; } /* Save out_of_memory result at script start, otherwise if we check OOM * untill first write within script, memory used by lua stack and * arguments might interfere. */ if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) { server.lua_oom = out_of_memory; } } ...... } -
evict.c#freeMemoryIfNeededAndSafe()函式只是個入口,真正的資料淘汰處理在evict.c#freeMemoryIfNeeded()函式中,這個函式的實作代碼很長,不過可以分為以下幾個步驟:- 首先進行各項檢查,例如呼叫
getMaxmemoryState()函式檢查服務端當前占用記憶體是不是超過了最大記憶體設定,之后還要檢查 redis 服務端最大記憶體的處理策略 (server.maxmemory_policy)是不是禁止洗掉資料 - 根據服務端最大記憶體的處理策略的不同,會有不同的處理:
- 對于最近最少使用 LRU,最少使用 LFU 以及到達過期時間 TTL 這幾種對 key 有一定要求的洗掉策略,需要遍歷所有 redis 資料庫,根據最大記憶體的處理策略進一步確定洗掉 key 的范圍(所有資料(
db->dict)或者過期資料(db->expires)),然后呼叫evict.c#evictionPoolPopulate()函式從中挑選出可以洗掉的候選 key,最后確定一個最佳的bestkey - 對于隨機淘汰這種對 key 沒太多要求的洗掉策略,同樣遍歷所有 redis 資料庫,根據最大記憶體的處理策略確定洗掉 key 的范圍,然后呼叫
dict.c#dictGetRandomKey()挑選bestkey
- 對于最近最少使用 LRU,最少使用 LFU 以及到達過期時間 TTL 這幾種對 key 有一定要求的洗掉策略,需要遍歷所有 redis 資料庫,根據最大記憶體的處理策略進一步確定洗掉 key 的范圍(所有資料(
- 確定了要洗掉的
bestkey,將其洗掉即可,如果釋放的記憶體還是沒有達到要求,while 回圈繼續
int freeMemoryIfNeededAndSafe(void) { if (server.lua_timedout || server.loading) return C_OK; return freeMemoryIfNeeded(); } /* This function is periodically called to see if there is memory to free * according to the current "maxmemory" settings. In case we are over the * memory limit, the function will try to free some memory to return back * under the limit. * * The function returns C_OK if we are under the memory limit or if we * were over the limit, but the attempt to free memory was successful. * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { int keys_freed = 0; /* By default replicas should ignore maxmemory * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency, lazyfree_latency; long long delta; int slaves = listLength(server.slaves); int result = C_ERR; /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return C_OK; if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) return C_OK; mem_freed = 0; latencyStartMonitor(latency); if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ while (mem_freed < mem_tofree) { int j, k, i; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; dict *dict; dictEntry *de; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; /* We don't want to make local-db choices when expiring keys, * so to start populate the eviction pool sampling keys from * every DB. */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? db->dict : db->expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db->dict, pool); total_keys += keys; } } if (!total_keys) break; /* No keys to evict. */ /* Go backward from best to worst element to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); } /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) sdsfree(pool[k].key); pool[k].key = NULL; pool[k].idle = 0; /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } } /* volatile-random and allkeys-random policy */ else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to * incrementally visit all DBs. */ for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db->dict : db->expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); bestdbid = j; break; } } } /* Finally remove the selected key. */ if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one * we are freeing removing the key, but we can't account for * that otherwise we would never exit the loop. * * AOF and Output buffer memory will be freed eventually so * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); signalModifiedKey(NULL,db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); decrRefCount(keyobj); keys_freed++; /* When the memory to free starts to be big enough, we may * start spending so much time here that is impossible to * deliver data to the slaves fast enough, so we force the * transmission here inside the loop. */ if (slaves) flushSlavesOutputBuffers(); /* Normally our stop condition is the ability to release * a fixed, pre-computed amount of memory. However when we * are deleting objects in another thread, it's better to * check, from time to time, if we already reached our target * memory, since the "mem_freed" amount is computed only * across the dbAsyncDelete() call, while the thread can * release the memory all the time. */ if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { /* Let's satisfy our stop condition. */ mem_freed = mem_tofree; } } } else { goto cant_free; /* nothing to free... */ } } result = C_OK; cant_free: /* We are here if we are not able to reclaim memory. There is only one * last thing we can try: check if the lazyfree thread has jobs in queue * and wait... */ if (result != C_OK) { latencyStartMonitor(lazyfree_latency); while(bioPendingJobsOfType(BIO_LAZY_FREE)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { result = C_OK; break; } usleep(1000); } latencyEndMonitor(lazyfree_latency); latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency); } latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); return result; } - 首先進行各項檢查,例如呼叫
2.1.2 命令執行時觸發
-
以本文第一節中的
SETEX命令為例,t_string.c#setGenericCommand()函式中最終將資料保存進 redis 資料庫的函式為genericSetKey()void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { ...... genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1); ...... } -
t_string.c#genericSetKey()函式的實作很簡練,不再贅述,此處重點關注lookupKeyWrite()函式void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); } else { dbOverwrite(db,key,val); } incrRefCount(val); if (!keepttl) removeExpire(db,key); if (signal) signalModifiedKey(c,db,key); } -
db.c#lookupKeyWrite()函式只是個入口,可以看到它最侄訓呼叫到expireIfNeeded()函式,而這個函式就是 redis 命令執行程序中洗掉過期資料的關鍵robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { expireIfNeeded(db,key); return lookupKey(db,key,flags); } robj *lookupKeyWrite(redisDb *db, robj *key) { return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); } -
db.c#expireIfNeeded()函式邏輯較為簡單,主要做了如下幾個動作:- 呼叫函式
keyIsExpired()判斷 key 是否過期 - 向slave節點傳播執行過期 key 的動作并發送事件通知
- 洗掉過期 key
int expireIfNeeded(redisDb *db, robj *key) { if (!keyIsExpired(db,key)) return 0; /* If we are running in the context of a slave, instead of * evicting the expired key from the database, we return ASAP: * the slave key expiration is controlled by the master that will * send us synthesized DEL operations for expired keys. * * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ if (server.masterhost != NULL) return 1; /* Delete the key */ server.stat_expiredkeys++; propagateExpire(db,key,server.lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); if (retval) signalModifiedKey(NULL,db,key); return retval; } - 呼叫函式
2.2 定期洗掉
-
定期洗掉是通過 redis 定時任務實作的,而定時任務入口為
server.c#serverCron()函式,這個函式實作代碼較多,以下省略不相關的部分,只關注定期洗掉資料的部分,關鍵函式為server.c#databasesCron()int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ...... /* Handle background operations on Redis databases. */ databasesCron(); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (hasActiveChildProcess() || ldbPendingChildren()) { checkChildrenDone(); } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } } /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && !hasActiveChildProcess() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * an higher frequency. */ run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } ...... } -
server.c#databasesCron()函式會對資料庫執行洗掉過期鍵、調整大小以及漸進式 rehash 等動作,本節主要關注洗掉過期鍵的操作,這部分由expire.c#activeExpireCycle()函式實作void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); } } /* Defrag keys gradually. */ activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ if (!hasActiveChildProcess()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j++) { tryResizeHashTables(resize_db % server.dbnum); resize_db++; } /* Rehash */ if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum; } } } } } -
expire.c#activeExpireCycle()函式實作代碼很長,其中需要注意的步驟如下:- 遍歷指定個數的db(如16)進行洗掉過期資料的操作
- 針對每個 db 每輪遍歷不超過指定數量(如20)的節點,隨機獲取有過期時間的節點資料,呼叫
activeExpireCycleTryExpire()函式嘗試洗掉資料 - 每個db 的遍歷的輪數累積到16次的時候,會判斷使用的時間是否超過定時任務執行時間的 25%(
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100),超過就停止洗掉資料程序 - 最后如果已經洗掉的過期資料與隨機選中的待過期資料的比值超過了配置值,也停止洗掉資料
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */ void activeExpireCycle(int type) { ...... for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { /* Expired and checked in a single loop. */ unsigned long expired, sampled; redisDb *db = server.db+(current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time * in the current DB we'll restart from the next. This allows to * distribute the time evenly across DBs. */ current_db++; /* Continue to expire if at the end of the cycle there are still * a big percentage of keys to expire, compared to the number of keys * we scanned. The percentage, stored in config_cycle_acceptable_stale * is not fixed, but depends on the Redis configured "expire effort". */ do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; iteration++; /* If there is nothing to expire try next DB ASAP. */ if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } slots = dictSlots(db->expires); now = mstime(); /* When there are less than 1% filled slots, sampling the key * space is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; sampled = 0; ttl_sum = 0; ttl_samples = 0; if (num > config_keys_per_loop) num = config_keys_per_loop; /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, * but it hardly changed in ten years. * * Note that certain places of the hash table may be empty, * so we want also a stop condition about the number of * buckets that we scanned. However scanning for free buckets * is very fast: we are in the cache line scanning a sequential * array of NULL pointers, so we can scan a lot more buckets * than keys in the same time. */ long max_buckets = num*20; long checked_buckets = 0; while (sampled < num && checked_buckets < max_buckets) { for (int table = 0; table < 2; table++) { if (table == 1 && !dictIsRehashing(db->expires)) break; unsigned long idx = db->expires_cursor; idx &= db->expires->ht[table].sizemask; dictEntry *de = db->expires->ht[table].table[idx]; long long ttl; /* Scan the current bucket of the current table. */ checked_buckets++; while(de) { /* Get the next entry now since this entry may get * deleted. */ dictEntry *e = de; de = de->next; ttl = dictGetSignedIntegerVal(e)-now; if (activeExpireCycleTryExpire(db,e,now)) expired++; if (ttl > 0) { /* We want the average TTL of keys yet * not expired. */ ttl_sum += ttl; ttl_samples++; } sampled++; } } db->expires_cursor++; } total_expired += expired; total_sampled += sampled; /* Update the average TTL stats for this database. */ if (ttl_samples) { long long avg_ttl = ttl_sum/ttl_samples; /* Do a simple running average with a few samples. * We just use the current estimate with a weight of 2% * and the previous estimate with a weight of 98%. */ if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); } /* We can't block forever here even if there are many keys to * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; server.stat_expired_time_cap_reached_count++; break; } } /* We don't repeat the cycle for the current database if there are * an acceptable amount of stale keys (logically expired but yet * not reclaimed). */ } while (sampled == 0 || (expired*100/sampled) > config_cycle_acceptable_stale); } ...... } -
expire.c#activeExpireCycleTryExpire()是真正嘗試洗掉過期資料的處理函式,以下原始碼簡單明了,不再贅述int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { long long t = dictGetSignedIntegerVal(de); if (now > t) { sds key = dictGetKey(de); robj *keyobj = createStringObject(key,sdslen(key)); propagateExpire(db,keyobj,server.lazyfree_lazy_expire); if (server.lazyfree_lazy_expire) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); trackingInvalidateKey(NULL,keyobj); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; } else { return 0; } }
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/192568.html
標籤:其他
上一篇:上位機開發基于MFC使用到LisControlt控制元件的一些使用方法
下一篇:Mysql性能調優(二)
