這一次主要講下redis中服務器這個結構體相關代碼,主要從是代碼層面進行講解
redis服務器
redis服務器結構體主要代碼在redis.h/redisServer,下面給出該結構體原始碼,可以看到原始碼中對該結構體定義很長,這一節我們一點點分析,當然有些地方可能我也理解不到位hhh
// redis服務器實體
struct redisServer {
char *configfile; /* 組態檔的絕對路徑 */
int hz; /* serverCron() 每秒呼叫的次數 */
redisDb *db; /* 資料庫陣列,里面存放的是該服務器所有的資料庫 */
dict *commands; /* 命令表(受到 rename 配置選項的作用) */
dict *orig_commands; /* 命令表(無 rename 配置選項的作用) */
aeEventLoop *el; /* 事件狀態 */
unsigned lruclock:REDIS_LRU_BITS; /* 最近一次使用時鐘 */
int shutdown_asap; /* 關閉服務器的標識 */
int activerehashing; /* 在執行 serverCron() 時進行漸進式 rehash */
char *requirepass; /* 是否設定了密碼 */
char *pidfile; /* PID 檔案路徑 */
int arch_bits; /* 架構型別32or64 */
int cronloops; /* serverCron() 函式的運行次數計數器 */
char runid[REDIS_RUN_ID_SIZE+1]; /* 本服務器的 RUN ID ID在每秒都會變化 */
int sentinel_mode; /* 服務器是否運行在 SENTINEL 模式 */
int port; /* TCP 監聽埠 */
int tcp_backlog; /* TCP連接中已完成佇列(完成三次握手之后)的長度 */
char *bindaddr[REDIS_BINDADDR_MAX]; /* 系結地址 */
int bindaddr_count; /* bindaddr地址數量 */
char *unixsocket; /* UNIX socket 路徑 */
mode_t unixsocketperm; /* UNIX socket permission */
int ipfd[REDIS_BINDADDR_MAX]; /* TCP套接字描述符 */
int ipfd_count; /* ipfd中使用的套接字數量 */
int sofd; /* Unix套接字描述符 */
int cfd[REDIS_BINDADDR_MAX];/* 集群總線監聽套接字 */
int cfd_count; /* cfd使用到的套接字數量 */
list *clients; /* 鏈表,保存了所有客戶端狀態結構 */
list *clients_to_close; /* 鏈表,保存了所有待關閉的客戶端 */
list *slaves, *monitors; /* 鏈表,保存了所有從服務器,以及所有監視器 */
redisClient *current_client; /* C服務器的當前客戶端,僅用于崩潰報告 */
int clients_paused; /* 客服端是否被paused */
mstime_t clients_pause_end_time; /* 執行undo clients_paused的時間 */
char neterr[ANET_ERR_LEN]; /* anet.c網路錯誤緩沖區 */
dict *migrate_cached_sockets;/* MIGRATE緩沖套接字 */
int loading; /* 服務器是否正在被載入 */
off_t loading_total_bytes; /* 正在載入的資料的大小 */
off_t loading_loaded_bytes; /* 已載入資料的大小 */
time_t loading_start_time; /* 開始進行載入的時間 */
off_t loading_process_events_interval_bytes;
// 常用命令的快捷連接
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand;
time_t stat_starttime; /* 服務器啟動時間 */
long long stat_numcommands; /* 已處理命令的數量 */
long long stat_numconnections; /* 服務器接到的連接請求數量 */
long long stat_expiredkeys; /* 已過期的鍵數量 */
long long stat_evictedkeys; /* 因為回收記憶體而被釋放的過期鍵的數量 */
long long stat_keyspace_hits; /* 成功查找鍵的次數 */
long long stat_keyspace_misses; /* 查找鍵失敗的次數 */
size_t stat_peak_memory; /* 已使用記憶體峰值 */
long long stat_fork_time; /* 最后一次執行 fork() 時消耗的時間 */
long long stat_rejected_conn; /* 服務器因為客戶端數量過多而拒絕客戶端連接的次數 */
long long stat_sync_full; /* 執行 full sync 的次數 */
long long stat_sync_partial_ok; /* PSYNC 成功執行的次數 */
long long stat_sync_partial_err;/* PSYNC 執行失敗的次數 */
list *slowlog; /* 保存了所有慢查詢日志的鏈表 */
long long slowlog_entry_id; /* SLOWLOG當前條目ID */
long long slowlog_log_slower_than; /* 服務器配置 slowlog-log-slower-than 選項的值(SLOWLOG時間限制) */
unsigned long slowlog_max_len; /* 服務器配置 slowlog-max-len 選項的值(SLOWLOG記錄的最大專案數) */
size_t resident_set_size; /* serverCron()中rss采樣次數. */
long long ops_sec_last_sample_time; /* 最后一次進行抽樣的時間 */
long long ops_sec_last_sample_ops; /* 最后一次抽樣時,服務器已執行命令的數量 */
long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES]; /* 抽樣結果 */
int ops_sec_idx; /* 陣列索引,用于保存抽樣結果,并在需要時回繞到 0 */
int verbosity; /* 日志等級 Redis總共支持四個級別:debug、verbose、notice、warning,默認為notice */
int maxidletime; /* 客戶端超時最大時間 */
int tcpkeepalive; /* 是否開啟SO_KEEPALIVE選項 */
int active_expire_enabled; /* 測驗時候可以禁用 */
size_t client_max_querybuf_len; /* 客戶端查詢緩沖區長度限制 */
int dbnum; /* 服務器初始化應該創建多少個服務器 config中databases 16可以設定該選項 */
int daemonize; /* 如果作為守護行程運行,則為True */
// 客戶端輸出緩沖區大小限制
// 陣列的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 個
// 每個代表一類客戶端:普通、從服務器、pubsub,諸如此類
clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
int aof_state; /* AOF 狀態(開啟/關閉/可寫) */
int aof_fsync; /* 所使用的 fsync 策略(每個寫入/每秒/從不) */
char *aof_filename; /* AOF檔案名字 */
int aof_no_fsync_on_rewrite; /* 如果重寫是在prog中,請不要fsync */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_base_size; /* 最后一次執行 BGREWRITEAOF 時, AOF 檔案的大小 */
off_t aof_current_size; /* AOF 檔案的當前位元組大小 */
int aof_rewrite_scheduled; /* BGSAVE終止后重寫 */
pid_t aof_child_pid; /* 負責進行 AOF 重寫的子行程 ID */
list *aof_rewrite_buf_blocks; /* AOF 重寫快取鏈表,鏈接著多個快取塊 */
sds aof_buf; /* AOF 緩沖區 */
int aof_fd; /* 當前所選AOF檔案的檔案描述符 */
int aof_selected_db; /* 當前在AOF中選擇的資料庫 */
time_t aof_flush_postponed_start; /*推遲AOF flush的UNIX時間 */
time_t aof_last_fsync; /* 最后一直執行 fsync 的時間 */
time_t aof_rewrite_time_last; /* 最后一次AOF重寫運行所用的時間 */
time_t aof_rewrite_time_start; /* 當前AOF重寫開始時間 */
int aof_lastbgrewrite_status; /* 最后一次執行 BGREWRITEAOF 的結果REDIS_OK或REDIS_ERR */
unsigned long aof_delayed_fsync; /* 記錄 AOF 的 write 操作被推遲了多少次 */
int aof_rewrite_incremental_fsync;/* 指示是否需要每寫入一定量的資料,就主動執行一次 fsync() */
int aof_last_write_status; /* REDIS_OK or REDIS_ERR */
int aof_last_write_errno; /* 如果aof_last_write_status是ERR,則有效 */
long long dirty; /* 自從上次 SAVE 執行以來,資料庫被修改的次數 */
long long dirty_before_bgsave; /* BGSAVE 執行前的資料庫被修改次數 */
pid_t rdb_child_pid; /* 負責執行 BGSAVE 的子行程的 ID,沒在執行 BGSAVE 時,設為 -1 */
struct saveparam *saveparams; /* 為RDB保存點陣列 */
int saveparamslen; /* saveparams長度 */
char *rdb_filename; /* RDB檔案的名稱 */
int rdb_compression; /* 是否在RDB中使用壓縮 */
int rdb_checksum; /* 是否使用RDB校驗和 */
time_t lastsave; /* 最后一次完成 SAVE 的時間 */
time_t lastbgsave_try; /* 最后一次嘗試執行 BGSAVE 的時間 */
time_t rdb_save_time_last; /* 最近一次 BGSAVE 執行耗費的時間 */
time_t rdb_save_time_start; /* 資料庫最近一次開始執行 BGSAVE 的時間 */
int lastbgsave_status; /* 最后一次執行 SAVE 的狀態REDIS_OK or REDIS_ERR */
int stop_writes_on_bgsave_err; /* 如果不能BGSAVE,不允許寫入 */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
char *logfile; /* 日志檔案的路徑 */
int syslog_enabled; /* 是否啟用了syslog */
char *syslog_ident; /* 指定syslog的標示符,如果上面的syslog-enabled no,則這個選項無效 */
int syslog_facility; /* 指定syslog facility,必須是USER或者LOCAL0到LOCAL7 */
int slaveseldb; /* Last SELECTed DB in replication output */
long long master_repl_offset; /* 全域復制偏移量(一個累計值) */
int repl_ping_slave_period; /* Master每N秒ping一次slave */
// backlog 本身
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog回圈緩沖區大小 */
long long repl_backlog_histlen; /* backlog 中資料的長度 */
long long repl_backlog_idx; /* backlog 的當前索引 */
long long repl_backlog_off; /* backlog 中可以被還原的第一個位元組的偏移量 */
time_t repl_backlog_time_limit; /* backlog 的過期時間 */
time_t repl_no_slaves_since; /* 距離上一次有從服務器的時間 */
int repl_min_slaves_to_write; /* 是否開啟最小數量從服務器寫入功能 */
int repl_min_slaves_max_lag; /* 定義最小數量從服務器的最大延遲值 */
int repl_good_slaves_count; /* 延遲良好的從服務器的數量 lag <= max_lag. */
char *masterauth; /* 主服務器的驗證密碼 */
char *masterhost; /* 主服務器的地址 */
int masterport; /* 主服務器的埠 */
int repl_timeout; /* 主機空閑N秒后超時 */
redisClient *master; /* 主服務器所對應的客戶端 */
redisClient *cached_master; /* 被快取的主服務器,PSYNC 時使用 */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* 復制的狀態(服務器是從服務器時使用) */
off_t repl_transfer_size; /* 在同步期間從主機讀取的RDB的大小 */
off_t repl_transfer_read; /* 在同步期間從主設備讀取的RDB位元組數 */
// 最近一次執行 fsync 時的偏移量
// 用于 sync_file_range 函式
off_t repl_transfer_last_fsync_off; /* 上次fsync-ed時偏移 */
int repl_transfer_s; /* 主服務器的套接字 */
int repl_transfer_fd; /* 保存 RDB 檔案的臨時檔案的描述符 */
char *repl_transfer_tmpfile; /* 保存 RDB 檔案的臨時檔案名字 */
time_t repl_transfer_lastio; /* 最近一次讀入 RDB 內容的時間 */
int repl_serve_stale_data; /* Serve stale data when link is down? */
int repl_slave_ro; /* 從服務器是否只讀 */
time_t repl_down_since; /* 連接斷開的時長 */
int repl_disable_tcp_nodelay; /* 是否要在 SYNC 之后關閉 NODELAY */
int slave_priority; /* 從服務器優先級 */
char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /*本服務器(從服務器)當前主服務器的 RUN ID */
long long repl_master_initial_offset; /* Master PSYNC offset. */
/* ---------下面一些屬性有些很難用到,對此我也沒仔細看 */
/* Replication script cache. */
// 復制腳本快取
// 字典
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
// FIFO 佇列
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
// 快取的大小
int repl_scriptcache_size; /* Max number of elements. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
int maxclients; /* 最大并發客戶端數 */
unsigned long long maxmemory; /* 要使用的最大記憶體位元組數 */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Pricision of random sampling */
unsigned int bpop_blocked_clients; /* 串列阻止的客戶端數量 */
list *unblocked_clients; /* 在下一個回圈之前解鎖的客戶端串列 */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
int sort_store;
/* Zip structure config, see redis.conf for more information */
size_t hash_max_ziplist_entries;
size_t hash_max_ziplist_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
size_t set_max_intset_entries;
size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value;
size_t hll_sparse_max_bytes;
time_t unixtime; /* Unix time sampled every cron cycle. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
/* Pubsub */
// 字典,鍵為頻道,值為鏈表
// 鏈表中保存了所有訂閱某個頻道的客戶端
// 新客戶端總是被添加到鏈表的表尾
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// 這個鏈表記錄了客戶端訂閱的所有模式的名字
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
/* Cluster */
int cluster_enabled; /* 群集是否已啟用 */
mstime_t cluster_node_timeout; /* 集群節點超時時間. */
char *cluster_configfile; /* 集群自動生成的組態檔名 */
struct clusterState *cluster; /* 集群的狀態*/
int cluster_migration_barrier; /* Cluster replicas migration barrier. */
/* Scripting */
// Lua 環境
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
// 復制執行 Lua 腳本中的 Redis 命令的偽客戶端
redisClient *lua_client; /* The "fake client" to query Redis from Lua */
// 當前正在執行 EVAL 命令的客戶端,如果沒有就是 NULL
redisClient *lua_caller; /* The client running EVAL right now, or NULL */
// 一個字典,值為 Lua 腳本,鍵為腳本的 SHA1 校驗和
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
// Lua 腳本的執行時限
mstime_t lua_time_limit; /* Script timeout in milliseconds */
// 腳本開始執行的時間
mstime_t lua_time_start; /* Start time of script, milliseconds time */
// 腳本是否執行過寫命令
int lua_write_dirty; /* True if a write command was called during the
execution of the current script. */
// 腳本是否執行過帶有隨機性質的命令
int lua_random_dirty; /* True if a random command was called during the
execution of the current script. */
// 腳本是否超時
int lua_timedout; /* True if we reached the time limit for script
execution. */
// 是否要殺死腳本
int lua_kill; /* Kill the script if true. */
/* Assert & bug reporting */
char *assert_failed;
char *assert_file;
int assert_line;
int bug_report_start; /* True if bug report header was already logged. */
int watchdog_period; /* Software watchdog period in ms. 0 = off */
};
下面重點講下redis服務器啟動的流程,主要包括以下幾個步驟,不懂的同學可以看下redis.c/main函式,就可以大致了解其程序
- 檢查服務器是否以
Sentinel模式啟動 - 初始化全域服務器配置
initServerConfig() - 如果是
Sentinel模式,則初始化相關配置initSentinelConfig、initSentinel - 加載組態檔
loadServerConfig() - 將服務器行程設定為守護行程
daemonize - 初始化服務器
initServer - 如果服務器行程為守護行程,則創建PID檔案
createPidFile - 為服務器行程設定名字
redisSetProcTitle - 列印logo
redisAsciiArt - 加載資料庫
loadDataFromDisk:- AOF 持久化已打開,則使用
loadAppendOnlyFile(), - 否則使用加載RDB檔案
rdbLoad()
- AOF 持久化已打開,則使用
- 運行事件處理器,一直到服務器關閉為止
aeMain
下面對上面幾個函式依次進行講解
Sentinel模式
Sentinel模式就是哨兵模式,下面給出該模式的一個例子

其中server1是主服務器,其余server2,3,4為從服務器,在生產環境中,不免會有意外原因導致redis服務器掛掉,如果此時掛掉的是一個master節點,主節點宕機,主從復制將不能繼續進行,寫資料將會阻塞,而哨兵的存在主要是為了切換掉宕機的master,然后從master下面的slave節點中選舉一個作為新的master,并且把舊的master的slave全部轉移到新的master上面,繼續原有的主從復制, 哨兵本身是一個獨立的行程,本身也是有單點問題的,所以哨兵也有自身的集群,用來保證哨兵本身的容錯機制,
可以將redis中sentinel想成一個特殊的redis服務器,但是他不會像redis普通服務器那樣去加載rdb或者aof檔案,在initSentinel函式中,會創建一個sentinel結構體 sentinelState,代碼如下
/* Sentinel 的狀態結構 */
struct sentinelState {
// 當前紀元
uint64_t current_epoch;
// 保存了所有被這個 sentinel 監視的主服務器
// 字典的鍵是主服務器的名字
// 字典的值則是一個指向 sentinelRedisInstance 結構的指標
dict *masters;
// 是否進入了 TILT 模式?
int tilt;
// 目前正在執行的腳本的數量
int running_scripts;
// 進入 TILT 模式的時間
mstime_t tilt_start_time;
// 最后一次執行時間處理器的時間
mstime_t previous_time;
// 一個 FIFO 佇列,包含了所有需要執行的用戶腳本
list *scripts_queue;
} sentinel;
// 以 Sentinel 模式初始化服務器
void initSentinel(void) {
int j;
// 清空 Redis 服務器的命令表(該表用于普通模式)
dictEmpty(server.commands,NULL);
// 將 SENTINEL 模式所用的命令添加進命令表
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
redisAssert(retval == DICT_OK);
}
/* 初始化 Sentinel 的狀態 */
// 初始化紀元
sentinel.current_epoch = 0;
// 初始化保存主服務器資訊的字典
sentinel.masters = dictCreate(&instancesDictType,NULL);
// 初始化 TILT 模式的相關選項
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
// 初始化腳本相關選項
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
}
其中有一個master字典,這里面記錄了記錄了所有被 Sentinel 監視的主服務器的相關資訊, 其中:
- 字典的鍵是被監視主服務器的名字,
- 而字典的值則是被監視主服務器對應的
sentinel.c/sentinelRedisInstance結構,
每個 sentinelRedisInstance 結構代表一個被 Sentinel 監視的 Redis 服務器實體(instance), 這個實體可以是主服務器、從服務器、或者另外一個 Sentinel ,下面給出這個結構體的代碼
// Sentinel 會為每個被監視的 Redis 實體創建相應的 sentinelRedisInstance 實體
// (被監視的實體可以是主服務器、從服務器、或者其他 Sentinel )
typedef struct sentinelRedisInstance {
// 標識值,記錄了實體的型別,以及該實體的當前狀態
// 當為SRI_MASTER為主服務器,當為SRI_SLAVE為從服務器,當為SRI_SENTINEL為sentinel服務器
int flags;
// 實體的名字
// 主服務器的名字由用戶在組態檔中設定
// 從服務器以及 Sentinel 的名字由 Sentinel 自動設定
// 格式為 ip:port ,例如 "127.0.0.1:26379"
char *name;
// 實體的運行 ID
char *runid;
// 配置紀元,用于實作故障轉移
uint64_t config_epoch;
// 實體的地址
sentinelAddr *addr;
// 用于發送命令的異步連接
redisAsyncContext *cc;
// 用于執行 SUBSCRIBE 命令、接收頻道資訊的異步連接
// 僅在實體為主服務器時使用
redisAsyncContext *pc;
// 已發送但尚未回復的命令數量
int pending_commands;
// cc 連接的創建時間
mstime_t cc_conn_time;
// pc 連接的創建時間
mstime_t pc_conn_time;
// 最后一次從這個實體接收資訊的時間
mstime_t pc_last_activity;
// 實體最后一次回傳正確的 PING 命令回復的時間
mstime_t last_avail_time;
// 實體最后一次發送 PING 命令的時間
mstime_t last_ping_time;
// 實體最后一次回傳 PING 命令的時間,無論內容正確與否
mstime_t last_pong_time;
// 最后一次向頻道發送問候資訊的時間
// 只在當前實體為 sentinel 時使用
mstime_t last_pub_time;
// 最后一次接收到這個 sentinel 發來的問候資訊的時間
// 只在當前實體為 sentinel 時使用
mstime_t last_hello_time;
// 最后一次回復 SENTINEL is-master-down-by-addr 命令的時間
// 只在當前實體為 sentinel 時使用
mstime_t last_master_down_reply_time;
// 實體被判斷為 SDOWN 狀態的時間
mstime_t s_down_since_time;
// 實體被判斷為 ODOWN 狀態的時間
mstime_t o_down_since_time;
// SENTINEL down-after-milliseconds 選項所設定的值
// 實體無回應多少毫秒之后才會被判斷為主觀下線(subjectively down)
mstime_t down_after_period;
// 從實體獲取 INFO 命令的回復的時間
mstime_t info_refresh;
// 實體的角色
int role_reported;
// 角色的更新時間
mstime_t role_reported_time;
// 最后一次從服務器的主服務器地址變更的時間
mstime_t slave_conf_change_time;
/* 主服務器實體特有的屬性 */
// 其他同樣監控這個主服務器的所有 sentinel
dict *sentinels;
// 如果這個實體代表的是一個主服務器
// 那么這個字典保存著主服務器屬下的從服務器
// 字典的鍵是從服務器的名字,字典的值是從服務器對應的 sentinelRedisInstance 結構
dict *slaves;
// SENTINEL monitor <master-name> <IP> <port> <quorum> 選項中的 quorum 引數
// 判斷這個實體為客觀下線(objectively down)所需的支持投票數量
int quorum;
// SENTINEL parallel-syncs <master-name> <number> 選項的值
// 在執行故障轉移操作時,可以同時對新的主服務器進行同步的從服務器數量
int parallel_syncs;
// 連接主服務器和從服務器所需的密碼
char *auth_pass;
/* 從服務器實體特有的屬性*/
// 主從服務器連接斷開的時間
mstime_t master_link_down_time;
// 從服務器優先級
int slave_priority;
// 執行故障轉移操作時,從服務器發送 SLAVEOF <new-master> 命令的時間
mstime_t slave_reconf_sent_time;
// 主服務器的實體(在本實體為從服務器時使用)
struct sentinelRedisInstance *master;
// INFO 命令的回復中記錄的主服務器 IP
char *slave_master_host;
// INFO 命令的回復中記錄的主服務器埠號
int slave_master_port;
// INFO 命令的回復中記錄的主從服務器連接狀態
int slave_master_link_status;
// 從服務器的復制偏移量
unsigned long long slave_repl_offset;
/* 故障轉移相關屬性*/
// 如果這是一個主服務器實體,那么 leader 將是負責進行故障轉移的 Sentinel 的運行 ID ,
// 如果這是一個 Sentinel 實體,那么 leader 就是被選舉出來的領頭 Sentinel ,
// 這個域只在 Sentinel 實體的 flags 屬性的 SRI_MASTER_DOWN 標志處于打開狀態時才有效,
char *leader;
// 領頭的紀元
uint64_t leader_epoch;
// 當前執行中的故障轉移的紀元
uint64_t failover_epoch;
// 故障轉移操作的當前狀態
int failover_state;
// 狀態改變的時間
mstime_t failover_state_change_time;
// 最后一次進行故障遷移的時間
mstime_t failover_start_time;
// SENTINEL failover-timeout <master-name> <ms> 選項的值
// 重繪故障遷移狀態的最大時限
mstime_t failover_timeout;
mstime_t failover_delay_logged;
// 指向被提升為新主服務器的從服務器的指標
struct sentinelRedisInstance *promoted_slave;
// 一個檔案路徑,保存著 WARNING 級別的事件發生時執行的,
// 用于通知管理員的腳本的地址
char *notification_script;
// 一個檔案路徑,保存著故障轉移執行之前、之后、或者被中止時,
// 需要執行的腳本的地址
char *client_reconfig_script;
} sentinelRedisInstance;
假如此時啟動sentinel時候,組態檔如下
#####################
# master1 configure #
#####################
sentinel monitor master1 127.0.0.1 6379 2
sentinel down-after-milliseconds master1 30000
sentinel parallel-syncs master1 1
sentinel failover-timeout master1 900000
#####################
# master2 configure #
#####################
sentinel monitor master2 127.0.0.1 12345 5
sentinel down-after-milliseconds master2 50000
sentinel parallel-syncs master2 5
sentinel failover-timeout master2 450000
則會為2個服務器創建如下結構體


sentinel結構體中maste字典內容如下

當一個redis服務器以sentinel模式啟動,則它會自動去替換一些普通模式服務器的代碼,比如普通redis服務器使用redis.h/REDIS_SERVERPORT作為埠,但是sentinel模式下會以sentinel.c/REDIS_SENTINEL_PORT作為埠,同時普通redis服務器的支持的命令在redis.c/redisCommandTable中,但是sentinel模式下支持的命令在sentinel.c/sentinelcmds,其中代碼較少,下面給出代碼
// 服務器在 sentinel 模式下可執行的命令
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};
sentinel主要就是為了應用于主服務器下線導致集群不可用情況,因此最重要的就是如何檢測和如何防范,下面通過主觀下線和客觀下線兩種方式進行說明
- 主觀下線
默認情況下,每個sentinel會每秒鐘向其他所有主服務器、從服務器、sentinels發送ping訊息,回傳結果分為有效回傳(+PONG、-LOADING、-MASTERDOWN)三者之一或無效回傳(上述三種其他回復或者指定時間內沒有回復),若出現無效回傳情況,則會將sentinelRedisInstance屬性中的flag欄位打開SRI_S_DOWN標志
- 客觀下線
當一個sentinel對一臺服務器設定為主觀下線后,還需要判斷是否客觀下線,它會向其他監視該服務器的sentinels進行詢問,當接收到足夠數量(設定的quorum引數)的sentinels說該服務器也下線,則表明該服務器客觀下線,客觀下線會打開SRI_O_DOWN標志
當一個主服務器被判定為客觀下線后,監視這個下線服務器的全部sentinels會進行協商,選舉出一個lead sentinel,這個lead sentinel會對下線服務器進行故障轉移,包括三個步驟
1、在已下線主服務器的從服務器中選一個主服務器,然后向其發送SLAVEOF no one命令,設定為主服務器
2、讓已下線主服務器下面的從服務器用剛剛選舉的主服務器作為主服務器
3、將已下線的主服務器認剛剛選舉的主服務器作為自己的主服務器,當這個下線服務器再次上線時,就會真的設定為自己的主服務器
初始化全域服務器配置
redis.c/initServerConfig()
void initServerConfig() {
int j;
// 設定服務器的運行 ID
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
// 設定默認組態檔路徑
server.configfile = NULL;
// 設定默認服務器頻率
server.hz = REDIS_DEFAULT_HZ;
// 為運行 ID 加上結尾字符
server.runid[REDIS_RUN_ID_SIZE] = '\0';
// 設定服務器的運行架構
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
// 設定默認服務器埠號
server.port = REDIS_SERVERPORT;
server.tcp_backlog = REDIS_TCP_BACKLOG;
server.bindaddr_count = 0;
server.unixsocket = NULL;
server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd_count = 0;
server.sofd = -1;
server.dbnum = REDIS_DEFAULT_DBNUM;
server.verbosity = REDIS_DEFAULT_VERBOSITY;
server.maxidletime = REDIS_MAXIDLETIME;
server.tcpkeepalive = REDIS_DEFAULT_TCP_KEEPALIVE;
server.active_expire_enabled = 1;
server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN;
server.saveparams = NULL;
server.loading = 0;
server.logfile = zstrdup(REDIS_DEFAULT_LOGFILE);
server.syslog_enabled = REDIS_DEFAULT_SYSLOG_ENABLED;
server.syslog_ident = zstrdup(REDIS_DEFAULT_SYSLOG_IDENT);
server.syslog_facility = LOG_LOCAL0;
server.daemonize = REDIS_DEFAULT_DAEMONIZE;
server.aof_state = REDIS_AOF_OFF;
server.aof_fsync = REDIS_DEFAULT_AOF_FSYNC;
server.aof_no_fsync_on_rewrite = REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
server.aof_rewrite_perc = REDIS_AOF_REWRITE_PERC;
server.aof_rewrite_min_size = REDIS_AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
server.aof_lastbgrewrite_status = REDIS_OK;
server.aof_delayed_fsync = 0;
server.aof_fd = -1;
server.aof_selected_db = -1; /* 保證不選中任意資料庫 */
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.pidfile = zstrdup(REDIS_DEFAULT_PID_FILE);
server.rdb_filename = zstrdup(REDIS_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(REDIS_DEFAULT_AOF_FILENAME);
server.requirepass = NULL;
server.rdb_compression = REDIS_DEFAULT_RDB_COMPRESSION;
server.rdb_checksum = REDIS_DEFAULT_RDB_CHECKSUM;
server.stop_writes_on_bgsave_err = REDIS_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
server.activerehashing = REDIS_DEFAULT_ACTIVE_REHASHING;
server.notify_keyspace_events = 0;
server.maxclients = REDIS_MAX_CLIENTS;
server.bpop_blocked_clients = 0;
server.maxmemory = REDIS_DEFAULT_MAXMEMORY;
server.maxmemory_policy = REDIS_DEFAULT_MAXMEMORY_POLICY;
server.maxmemory_samples = REDIS_DEFAULT_MAXMEMORY_SAMPLES;
server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
server.hash_max_ziplist_value = https://www.cnblogs.com/shilinkun/archive/2022/04/20/REDIS_HASH_MAX_ZIPLIST_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
server.hll_sparse_max_bytes = REDIS_DEFAULT_HLL_SPARSE_MAX_BYTES;
server.shutdown_asap = 0;
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = REDIS_REPL_TIMEOUT;
server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE;
server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG;
server.cluster_enabled = 0;
server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT;
server.cluster_migration_barrier = REDIS_CLUSTER_DEFAULT_MIGRATION_BARRIER;
server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE);
server.lua_caller = NULL;
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL;
server.lua_timedout = 0;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.loading_process_events_interval_bytes = (1024*1024*2);
// 初始化 LRU 時間
server.lruclock = getLRUClock();
// 初始化并設定保存條件
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
// 初始化和復制相關的狀態
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.repl_master_initial_offset = -1;
server.repl_state = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = 0;
// 初始化 PSYNC 命令所使用的 backlog
server.repl_backlog = NULL;
server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE;
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
server.repl_no_slaves_since = time(NULL);
// 設定客戶端的輸出緩沖區限制
for (j = 0; j < REDIS_CLIENT_LIMIT_NUM_CLASSES; j++)
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
// 初始化浮點常量
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero;
// 初始化命令表
// 在這里初始化是因為接下來讀取 .conf 檔案時可能會用到這些命令
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
// 初始化慢查詢日志
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN;
// 初始化除錯項
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
server.assert_line = 0;
server.bug_report_start = 0;
server.watchdog_period = 0;
}
主要包括以下幾個方面
- 網路監聽相關,如系結地址,TCP埠等
- 虛擬記憶體相關,如swap檔案、page大小等
- 保存機制,多長時間內有多少次更新才進行保存
- 復制相關,如是否是slave,master地址、埠
- Hash相關設定
- 初始化命令表
加載組態檔
上面加載的可以想象成是一個默認組態檔,若 初始化時候,指定了組態檔,則會將其中一些欄位進行修改config.c/loadServerConfig
void loadServerConfig(char *filename, char *options) {
sds config = sdsempty();
char buf[REDIS_CONFIGLINE_MAX+1];
// 載入檔案內容
if (filename) {
FILE *fp;
if (filename[0] == '-' && filename[1] == '\0') {
fp = stdin;
} else {
if ((fp = fopen(filename,"r")) == NULL) {
redisLog(REDIS_WARNING,
"Fatal error, can't open config file '%s'", filename);
exit(1);
}
}
while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL)
config = sdscat(config,buf);
if (fp != stdin) fclose(fp);
}
// 追加 options 字串到內容的末尾
if (options) {
config = sdscat(config,"\n");
config = sdscat(config,options);
}
// 根據字串內容,設定服務器配置
loadServerConfigFromString(config);
sdsfree(config);
}
設定為守護行程
代碼如下
void daemonize(void) {
int fd;
if (fork() != 0) exit(0); /* 父行程退出 */
setsid(); /* 創建新會話 */
/* 將輸出定位到/dev/null */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
初始化服務器initServer
代碼如下
void initServer() {
int j;
// 設定信號處理函式
// 因為是守護行程,所以沒有控制終端,屏蔽SIGHUP
signal(SIGHUP, SIG_IGN);
// SIGPIPE是寫管道發現讀行程終止時產生的信號,redis是服務器,會遇到各種client,所以需要忽略
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
// 設定 syslog
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
// 初始化并創建資料結構
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.slaveseldb = -1;
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
// 創建共享物件
createSharedObjects();
adjustOpenFilesLimit();
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
// 打開 TCP 監聽埠,用于等待客戶端的命令請求
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
// 打開 UNIX 本地埠
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
// 創建并初始化資料庫結構
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
// 創建 PUBSUB 相關結構
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.resident_set_size = 0;
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
/* Create the serverCron() time event, that's our main way to process
* background operations. */
// 為 serverCron() 創建時間事件
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 為 TCP 連接關聯連接應答(accept)處理器
// 用于接受并應答客戶端的 connect() 呼叫
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 為本地套接字關聯應答處理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
/* Open the AOF file if needed. */
// 如果 AOF 持久化功能已經打開,那么打開或創建一個 AOF 檔案
if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
// 對于 32 位實體來說,默認將最大可用記憶體限制在 3 GB
if (server.arch_bits == 32 && server.maxmemory == 0) {
redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
}
// 如果服務器以 cluster 模式打開,那么初始化 cluster
if (server.cluster_enabled) clusterInit();
// 初始化復制功能有關的腳本快取
replicationScriptCacheInit();
// 初始化腳本系統
scriptingInit();
// 初始化慢查詢功能
slowlogInit();
// 初始化 BIO 系統
bioInit();
}
上面大多數注釋已經對代碼進行講解,下面對slowlogInit進行單獨講解
/*
* 初始化服務器慢查詢功能,
*
* 這個函式只應該在服務器啟動時執行一次,
*/
void slowlogInit(void) {
// 保存日志的鏈表,FIFO 順序
server.slowlog = listCreate();
// 日志數量計數器
server.slowlog_entry_id = 0;
// 日志鏈表的釋構函式
listSetFreeMethod(server.slowlog,slowlogFreeEntry);
}
/*
* 慢查詢日志
*/
typedef struct slowlogEntry {
// 命令與命令引數
robj **argv;
// 命令與命令引數的數量
int argc;
// 唯一識別符號
long long id;
// 執行命令消耗的時間,以微秒為單位
// 注釋里說的 nanoseconds 是錯誤的
long long duration;
// 命令執行時的時間,格式為 UNIX 時間戳
time_t time;
} slowlogEntry;
其中還有一個函式bioInit,redis的BIO系統在redis3.0版本主要做兩件事情:AOF持久化和關閉檔案,可以將BIO系統想象成下面:創建一個佇列,然后創建一些執行緒,來了一個任務就往佇列里面添加任務,執行緒去任務佇列里面取任務出來執行
因為在redis3.0中只需要做兩件事情,所以任務的結構體代碼如下
/*
* 表示后臺任務的資料結構
*
* 這個結構只由 API 使用,不會被暴露給外部,
*/
struct bio_job {
// 任務創建時的時間
time_t time;
/*
* 任務的引數,引數多于三個時,可以傳遞陣列或者結構 arg1一般是檔案描述符
*/
void *arg1, *arg2, *arg3;
};
- 任務初始化
首先是相關靜態變數的初始化
#define REDIS_BIO_NUM_OPS 2 // 2個任務
// 作業執行緒,斥互和條件變數
static pthread_t bio_threads[REDIS_BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
// 存放作業的佇列
static list *bio_jobs[REDIS_BIO_NUM_OPS];
// 初始化變數
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
// 創建執行緒
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
// 這里的函式引數是arg = j,也就是每個執行緒傳入一個編號j,0代表關閉檔案,1代表aof初始化
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
// bioProcessBackgroundJobs函式就是后臺執行任務的函式
void *bioProcessBackgroundJobs(void *arg) {
...
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
...
}
事件處理器回圈aeMain
這個回圈主要就是做兩件事情,beforeSleep和aeProcessEvents
// 運行事件處理器,一直到服務器關閉為止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
// 服務器關閉,停止事件回圈
aeDeleteEventLoop(server.el);
/*
* 設定處理事件前需要被執行的函式
*/
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}
/*
* 事件處理器的主回圈
*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件處理前執行的函式,那么運行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 開始處理事件 其實就是一個事件調度函式,包括處理時間事件和檔案事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
/*
* 洗掉事件處理器
*/
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
下面單獨對這兩個函式進行講解
beforeSleep
首先先看代碼
// 每次處理事件之前執行
void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
// 執行一次快速的主動過期檢查
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* 如果在之前的事件回圈迭代中至少有一個客戶端阻塞,則向所有slave發送ACK請求 */
if (server.get_ack_from_slaves) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}
/* 解除阻塞等待同步復制的所有客戶端 */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* 嘗試為剛剛解除阻塞的客戶端處理掛起的命令 */
if (listLength(server.unblocked_clients))
processUnblockedClients();
// 將 AOF 緩沖區的內容寫入到 AOF 檔案
// void flushAppendOnlyFile(int force) force引數表明是否強制重繪,當為0時候,若后臺有fsync在執行,則延遲
flushAppendOnlyFile(0);
// 在進入下個事件回圈前,執行一些集群收尾作業
if (server.cluster_enabled) clusterBeforeSleep();
}
aeProcessEvents
redis中的事件主要分為兩種事件:檔案事件(和其他客戶端連接產生的事件)和時間事件(定時時間產生的事件)
redis處理時間事件的函式會在服務器運行期間,每隔一段事件運行,處理時間事件,每個事件以鏈表形式掛在一起,每次處理時候,都是遍歷該鏈表
/* Process time events
*
* 處理所有已到達的時間事件
*/
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
// 通過重置事件的運行時間,
// 防止因時間穿插(skew)而造成的事件處理混亂
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次處理時間事件的時間
eventLoop->lastTime = now;
// 遍歷鏈表
// 執行那些已經到達的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
// 跳過無效事件
if (te->id > maxId) {
te = te->next;
continue;
}
// 獲取當前時間
aeGetTime(&now_sec, &now_ms);
// 如果當前時間等于或等于事件的執行時間,那么說明事件已到達,執行這個事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 執行事件處理器,并獲取回傳值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */
// 記錄是否有需要回圈執行這個事件時間
if (retval != AE_NOMORE) {
// 是的, retval 毫秒之后繼續執行這個時間事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 不,將這個事件洗掉
aeDeleteTimeEvent(eventLoop, id);
}
// 因為執行事件之后,事件串列可能已經被改變了
// 因此需要將 te 放回表頭,繼續開始執行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
下面的代碼就是redis的事件調度函式
/*
* 事件調度函式
* 處理所有已到達的時間事件,以及所有已就緒的檔案事件,
* 如果不傳入特殊 flags 的話,那么函式睡眠直到檔案事件就緒,
* 或者下個時間事件到達(如果有的話),
*
* 如果 flags 為 0 ,那么函式不作動作,直接回傳,
* 如果 flags 包含 AE_ALL_EVENTS ,所有型別的事件都會被處理,
* 如果 flags 包含 AE_FILE_EVENTS ,那么處理檔案事件,
* 如果 flags 包含 AE_TIME_EVENTS ,那么處理時間事件,
* 如果 flags 包含 AE_DONT_WAIT , 那么函式在處理完所有不許阻塞的事件之后,即刻回傳,
* 函式的回傳值為已處理事件的數量
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 獲取最近的時間事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果時間事件存在的話
// 那么根據最近可執行時間事件和現在時間的時間差來決定檔案事件的阻塞時間
long now_sec, now_ms;
// 計算距今最近的時間事件還要多久才能達到
// 并將該時間距保存在 tv 結構中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 時間差小于 0 ,說明事件已經可以執行了,將秒和毫秒設為 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 執行到這一步,說明沒有時間事件
// 那么根據 AE_DONT_WAIT 是否設定來決定是否阻塞,以及阻塞的時間長度
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 設定檔案事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 檔案事件可以阻塞直到有事件到達為止
tvp = NULL; /* wait forever */
}
}
// 處理檔案事件,阻塞時間由 tvp 決定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 從已就緒陣列中獲取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執行其中一個
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 執行時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
/*
* 獲取可執行事件
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 等待時間
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// 有至少一個事件就緒?
if (retval > 0) {
int j;
// 為已就緒事件設定相應的模式
// 并加入到 eventLoop 的 fired 陣列中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// 回傳已就緒事件個數
return numevents;
}
由上面代碼可知,因為檔案事件是隨機出現的,如果等待并處理完一次檔案事件之后,仍未有任何時間事件到達,那么服務器將再次等待并處理檔案事件,隨著檔案事件的不斷執行,時間會逐漸向時間事件所設定的到達時間逼近,并最終來到到達時間,這時服務器就可以開始處理到達的時間事件了,
自己的網址:www.shicoder.top
歡迎加群聊天 452380935
本文由博客一文多發平臺 OpenWrite 發布!
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/459607.html
標籤:其他
