
文章目錄
- 從 ngx_master_process_cycle 說起
- ngx_start_worker_processes
- ngx_spawn_process
- ngx_worker_process_cycle
- ngx_worker_process_init
從 ngx_master_process_cycle 說起
簡單做個偽代碼,看一下流程哈:
void ngx_master_process_cycle(ngx_cycle_t *cycle) {
···
// 啟動各個worker行程
ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN);
···
for (;;) {
···
if (···) {
// 這里主要是向各個行程發送命令
ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
}
}
}
怎么通信先別管,再看一下是怎么啟動作業行程的唄:
ngx_start_worker_processes
先了解一下 ngx_channel_t,有點重要哈:
master行程每次發送給worker行程的指令用如下的一個結構來完成封裝
typedef struct {
// 傳遞的 TCP 訊息中的命令
ngx_uint_t command;
// 行程 ID,一般是發送命令方的行程 ID
ngx_pid_t pid;
// 表示發送命令方在 ngx_processes 行程陣列間的序號
ngx_int_t slot;
// 通信的套接字句柄
ngx_fd_t fd;
}ngx_channel_t;
Nginx 針對 command 成員定義了如下命令:
// 打開頻道,使用頻道這種方式通信前必須發送的命令
#define NGX_CMD_OPEN_CHANNEL 1
// 關閉已經打開的頻道,實際上也就是關閉套接字
#define NGX_CMD_CLOSE_CHANNEL 2
// 要求接收方正常地退出行程
#define NGX_CMD_QUIT 3
// 要求接收方強制地結束行程
#define NGX_CMD_TERMINATE 4
// 要求接收方重新打開行程已經打開過的檔案
#define NGX_CMD_REOPEN 5
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
···
ngx_channel_t ch;
···
ch.command = NGX_CMD_OPEN_CHANNEL; //當前是新建了一個行程
for (i = 0; i < n; i++) {
//spawn,生成一個子行程
// ngx_worker_process_cycle:該子行程所進行的事件回圈,這里先不管它什么回圈
// worker行程在一個無限for回圈中,不斷的檢查相應的事件模型中是否存在對應的事件,
// 然后將accept事件和read、write事件分開放入兩個佇列中,最后在事件回圈中不斷的處理事件
ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type);
// 下面的這段代碼的主要作用是將新建行程這個事件通知到其他的行程,
// 其就會向ngx_processes陣列的每個行程的channel[0]上寫入當前廣播的事件,也即這里的ch,
// 因為子行程之間也需要通信
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot; //該新建行程所存放的陣列位置
ch.fd = ngx_processes[ngx_process_slot].channel[0];
// 將當前創建了子行程的事件廣播給其余的行程
ngx_pass_open_channel(cycle, &ch);
}
}
我們來看它怎么個生成法:
ngx_spawn_process
這里啊,需要關注一下:
typedef struct {
...
// socketpair 創建的套接字對
ngx_socket_t channel[2];
}ngx_processes_t;
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
···
// 在ngx_processes陣列中存盤了當前創建的所有行程,而ngx_last_process理解為end,
//只不過ngx_processes中記錄的行程有可能有部分已經失效了,
//當前回圈就是從頭開始查找是否有某個行程已經失效了,
//如果已經失效了,則復用該行程位置,否則直接使用ngx_last_process所指向的位置
for (s = 0; s < ngx_last_process; s++) {
if (ngx_processes[s].pid == -1) {
break;
}
}
// 這里說明所創建的行程數達到了最大限度
if (s == NGX_MAX_PROCESSES) {
···
return NGX_INVALID_PID;
}
// NGX_PROCESS_DETACHED標志表示當前fork出來的行程與原來的父行程沒有任何關系,比如進行nginx升級時,
// 新生成的master行程就與原先的master行程沒有關系
if (respawn != NGX_PROCESS_DETACHED) {
//這里為什么采用sockpair?
/* 這里的socketpair()方法的主要作用是生成一對套接字流,用于主行程和子行程的通信,
這一對套接字會存盤在ngx_processes[s].channel中,本質上這個欄位是一個長度為2的整型陣列,
在主行程和子行程 進行通信的之前,主行程會關閉其中一個,而子行程會關閉另一個,
然后相互之間往未關閉的另一個檔案描述符中寫入或讀取資料即可實作通信,
AF_UNIX表示當前使用的是UNIX檔案形式的socket地址族SOCK_STREAM指定了當前套接字建立的通信方式是管道流,
并且這個管道流是雙向的,即管道雙方都可以進行讀寫操作第三個引數protocol必須為0,
*/
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
···
return NGX_INVALID_PID;
}
···
// 將ngx_processes[s].channel[0]設定為非阻塞模式
if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
···
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 將ngx_processes[s].channel[1]設定為非阻塞模式
if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
···
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
on = 1;
// 將ngx_processes[s].channel[0]套接字管道設定為異步模式
if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
···
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 當前還處于主行程中,這里的ngx_pid指向了主行程的行程id,當前方法的作用主要是將
// ngx_processes[s].channel[0]的操作權限設定給主行程,也就是說主行程通過向
// ngx_processes[s].channel[0]寫入和讀取資料來與子行程進行通信
if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
···
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// FD_CLOEXEC表示當前指定的套接字管道在子行程中可以使用,但是在execl()執行的程式中不可使用
if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
···
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// FD_CLOEXEC表示當前指定的套接字管道在子行程中可以使用,但是在execl()執行的程式中不可使用
if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
···
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// ngx_processes[s].channel[1]是用于給子行程監聽相關事件使用的,當父行程向
// ngx_processes[s].channel[0]發布事件之后,
// ngx_processes[s].channel[1]中就會接收到對應的事件,從而進行相應的處理
ngx_channel = ngx_processes[s].channel[1];
} else {
// 如果是NGX_PROCESS_DETACHED模式,則表示當前是另外新起的一個master行程,因而將其管道值都置為-1
ngx_processes[s].channel[0] = -1;
ngx_processes[s].channel[1] = -1;
}
ngx_process_slot = s;
// fork()產生一個新的行程
pid = fork();
switch (pid) {
case -1:
// fork出錯
···
return NGX_INVALID_PID;
case 0:
// 子行程執行的分支,這里的proc()方法是外部傳進來的,也就是說,當前方法只是創建一個新的行程,
// 具體的行程處理邏輯,將交由外部代碼塊進行定義ngx_getpid()方法獲取的就是當前新創建的子行程的行程id
ngx_pid = ngx_getpid();
proc(cycle, data);
break;
default:
// 父行程會走到這里
break;
}
···
// 父行程會走到這里,當前的pid是fork()之后父行程得到的新創建的子行程的pid
ngx_processes[s].pid = pid;
ngx_processes[s].exited = 0;
···
// 設定當前行程的各個屬性,并且存盤到ngx_processes陣列中的對應位置
ngx_processes[s].proc = proc;
ngx_processes[s].data = data;
ngx_processes[s].name = name;
ngx_processes[s].exiting = 0;
···
if (s == ngx_last_process) {
ngx_last_process++;
}
return pid;
}
現在看到,master行程使用sockpair開了兩個套接字,其中,第0號位用于master行程向worker行程發送資訊,并設定異步,制定1號為子行程可用,接下來我們看看子行程被創建出來執行什么作業:
ngx_worker_process_cycle
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {
···
//初始化,開啟個監聽啥的,先等等,后面展開
ngx_worker_process_init(cycle, worker);
···
for (;;) {
//如果收到退出信號
···
// 這里通過檢查相應的事件模型中是否存在對應的事件,然后將其放入佇列中進行處理,
// 這里是worker行程處理事件的核心方法
ngx_process_events_and_timers(cycle);
// 如果當前nginx已經終止,則退出當前行程
if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
if (ngx_quit) {
···
}
···
}
}
這一段是沒看到我們想看的哈,看來還需要在深入一個函式去看看:
ngx_worker_process_init
/**
* 這里主要是對當前行程進行初始化,為其設定優先級和打開的檔案限制等引數,
* 最后會為當前行程添加一個監聽channel[1]的連接,以不斷讀取master行程的訊息,從而進行相應的處理
*/
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {
···
// 設定當前行程的優先級
if (worker >= 0 && ccf->priority != 0) {
if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
···
}
}
// 設定當前行程能夠打開的檔案句柄數
// 簡而言之就是設定核心檔案能夠使用的最大大小
···
// 需要注意的是,對于cache manager和cache loader行程,這里的worker傳入的是-1,
// 表示這兩個行程不需要設定親核性
if (worker >= 0) {
// 獲取當前worker的CPU親核性
cpu_affinity = ngx_get_cpu_affinity(worker);
if (cpu_affinity) {
// 設定worker的親核性
ngx_setaffinity(cpu_affinity, cycle->log); //就是系結個CPU,后面專門出一篇寫這個,這個我也是要收入囊中的
}
}
···
// 初始化空的set指令集合
sigemptyset(&set);
// ◆ SIG_BLOCK:將 set 引數指向信號集中的信號加入到信號掩碼中,
// ◆ SIG_UNBLOCK:將 set 引數指向的信號集中的信號從信號掩碼中洗掉,
// ◆ SIG_SETMASK:將 set 引數指向信號集設定為信號掩碼,
// 這里就是直接初始化要阻塞的信號集,默認為空集
if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
···
}
···
// 呼叫各個模塊的init_process()方法進行行程模塊的初始化,與本篇關系不大,后面講模塊了再說
···
// 這里主要是關閉當前行程中各個模塊的channel[0]管道句柄
for (n = 0; n < ngx_last_process; n++) {
···
//關閉父行程的channel[1]
if (close(ngx_processes[n].channel[1]) == -1) { //這是個全域的processes,好吧
···
}
}
// 關閉當前行程的channel[0]管道句柄
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
···
}
···
// ngx_channel指向的是當前行程的channel[1]句柄,也即監聽master行程發送訊息的句柄,
// 當前方法中,首先會為當前的句柄創建一個connection物件,并且將其封裝為一個事件,
//然后將該事件添加到對應的事件模型佇列中以監聽當前句柄的事件,事件的處理邏輯則主要有這里的ngx_channel_handler()方法進行,
//這里的ngx_channel_handler的主要處理邏輯是:根據當前收到的訊息設定當前行程的一些標志位,或者更新某些快取資料,
//如此,在當前進行的事件回圈中,通過不斷檢查這些標志位,從而實作在事件行程中處理真正的邏輯,
//因而這里的ngx_channel_handler的處理效率是非常高的
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler) == NGX_ERROR) {
exit(2);
}
}
這里worker行程的初始化程序主要做了三件事:
為worker行程設定優先級和提升打開檔案的權限;
設定worker行程的親核性;
關閉當前行程與master行程通信的管道陣列中的channel[0],然后監聽channel[1],以處理master行程的訊息;
當收到master行程發過來的命令后,就呼叫ngx_channel_handler處理,
至此,master-worker 之間的通信就講完了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/305160.html
標籤:其他
上一篇:IP地址詳解
下一篇:計算機網路(謝希仁 第七版) 第三章(資料鏈路層)-- 3.2 點對點協議PPP(PPP協議的特點 & PPP協議的幀格式 & PPP協議的作業狀態)
