
文章目錄
- 前言
- 概念闡述部分
- 什么是select?
- 什么是epoll?
- epoll的設計思路
- 阻塞I/O與非阻塞I/O
- 阻塞式檔案I/O
- 非阻塞式檔案I/O
- 多路復用I/O
- 邊緣觸發 VS 水平觸發
- epoll API
- 頭檔案
- 創建句柄
- epoll控制函式
- epoll訊息讀取
- 代碼示例
- 整體拔高:高效的并發方式
- 半同步/半異步模式
- 半同步/半反應堆模式(half-sync/half-reactive模式)
- 高效的半同步/半異步模式
- epoll原始碼學習
- 資料結構
- eventpoll
- epitem
- eppoll_entry
- 函式介面
- epoll_create()
- epoll_ctl()
- epoll_wait()
前言
不知不覺,就到大三了,
不知不覺,就要開始找暑期實習了,
溫故而知新嘛,(資料結構復習兩天發現不對,我還是更喜歡這個,)
所以就來了,
本篇系之前的“epoll,求知者離我近點”的拓展,加上了對epoll原始碼的學習,
當然,之前那篇也不會刪掉,不少朋友收藏了,
概念闡述部分
什么是select?
有的朋友可能對select也不是很了解啊,我這里稍微科普一下:網路連接,服務器也是通過檔案描述符來管理這些連接上來的客戶端,既然是供連接的服務器,那就免不了要接收來自客戶端的訊息,那么多臺客戶端,訊息那么的多,要是漏了一條兩條重要訊息,那也不要用TCP了,那怎么辦?
前輩們就是有辦法,輪詢,輪詢每個客戶端檔案描述符,查看他們是否帶著訊息,如果帶著,那就處理一下;如果沒帶著,那就一邊等著去,這就是select,輪詢,頗有點領導下基層的那種感覺哈,
但是這個select的輪詢吶,會有個問題,明眼人一下就能想到,那即是耗費資源啊,耗費什么資源,時間吶,慢吶(其實也挺快了,不過相對epoll來說就是慢),
再認真想一下,還浪費什么資源,系統資源,有的客戶端吶,占著那啥玩意兒不干那啥事兒,這種客戶端吶,還不少,這也怪不得人家,哪兒有客戶端時時刻刻在發訊息,要是有,那就要小心是不是惡意攻擊了,那把這么一堆偶爾動一下的客戶端的檔案描述符一直攥手里,累不累?能一次攥多少個?就像一個老板,一直想著下去巡視,那他可以去當車間組長了哈哈哈,
所以,select的默認上限一般是1024(FD_SETSIZE),當然我們可以手動去改,但是人家給個1024自然有人家的道理,改太大的話系統在這一塊的負載就大了,
那句話怎么說的來著,你每次對系統的索取,其實都早已明碼標價!哈哈哈,,,
所以,我們選用epoll模型,
什么是epoll?
epoll介面是為解決Linux內核處理大量檔案描述符而提出的方案,該介面屬于Linux下多路I/O復用介面中select/poll的增強,其經常應用于Linux下高并發服務型程式,特別是在大量并發連接中只有少部分連接處于活躍下的情況 (通常是這種情況),在該情況下能顯著的提高程式的CPU利用率,
前面說,select就像親自下基層視察的老板,那么epoll這個老板就要顯得精明的多了,他可不親自下基層,他找了個美女秘書,他只要盯著他的秘書看就行了,呸,他只需要聽取他的秘書的匯報就行了,匯報啥呢?基層有任何訊息,跟秘書說,秘書匯總之后一次性交給老板來處理,這樣老板的時間不就大大的提高了嘛,
如果你學過設計模式,這就是典型的“命令模式”,非常符合“依賴倒置原則”,這是一個非常美妙的模式,這個原則也是我最喜歡的一個原則,將高層實作與低層實作解耦合,從而可以各自開發,只要介面一致便可,這個介面,就是秘書,
扯遠了,如果對“設計模式”有興趣,可以找我的專欄,
好,言歸正傳哈哈哈,
epoll的設計思路
- (1)epoll在Linux內核中構建了一個檔案系統,該檔案系統采用紅黑樹來構建,紅黑樹在增加和洗掉上面的效率極高,因此是epoll高效的原因之一,有興趣可以百度紅黑樹了解,但在這里你只需知道其演算法效率超高即可,
- (2)epoll提供了兩種觸發模式,水平觸發(LT)和邊沿觸發(ET),當然,涉及到I/O操作也必然會有阻塞和非阻塞兩種方案,目前效率相對較高的是 epoll+ET+非阻塞I/O 模型,在具體情況下應該合理選用當前情形中最優的搭配方案,
- (3)epoll所支持的FD上限是最大可以打開檔案的數目,這個數字一般遠大于1024,舉個例子,在1GB記憶體的機器上大約是10萬左右,具體數目可以下面陳述句查看,一般來說這個數目和系統記憶體關系很大,
系統最大打開檔案描述符數
cat /proc/sys/fs/file-max
行程最大打開檔案描述符數
ulimit -n
修改這個配置:
sudo vi /etc/security/limits.conf
寫入以下配置,soft軟限制,hard硬限制
* soft nofile 65536
* hard nofile 100000
那個65536,改一下,不礙事,
阻塞I/O與非阻塞I/O
為了方便理解后面的內容,我們先看幾張圖,關于阻塞與非阻塞I/O的,
阻塞式檔案I/O

非阻塞式檔案I/O

多路復用I/O

好,有了上面這幾張圖墊著,咱來看看邊緣觸發和水平觸發,
邊緣觸發 VS 水平觸發
EPOLL 事件有兩種模型:
Edge Triggered (ET) 邊緣觸發 只有新資料到來,才觸發,不管快取區中是否還有資料,
Level Triggered (LT) 水平觸發 只要有資料都會觸發,不管資料是哪里的,
(這樣表述會不會好理解一些)
LT(level triggered) 是 預設 的作業方式 ,并且同時支持 block 和 no-block socket. 在這種做法中,內核告訴你一個檔案描述符是否就緒了,然后你可以對這個就緒的 fd 進行 IO 操作,如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯誤可能性要小一點,傳統的 select/poll 都是這種模型的代表.
ET(edge-triggered) 是高速作業方式 ,只支持 no-block socket ,在這種模式下,當描述符從未就緒變為就緒時,內核通過 epoll 告訴你,然后它會假設你知道檔案描述符已經就緒,并且不會再為那個檔案描述符發送更多的就緒通知,直到你做了某些操作導致那個檔案描述符不再為就緒狀態了 ( 比如,你在發送,接識訓者接收請求,或者發送接收的資料少于一定量時導致了一個 EWOULDBLOCK 錯誤),但是請注意,如果一直不對這個 fd 作 IO 操作 ( 從而導致它再次變成未就緒 ) ,內核不會發送更多的通知 (only once), 不過在 TCP 協議中, ET 模式的加速效用仍需要更多的 benchmark 確認,
epoll 作業在 ET 模式的時候,必須使用非阻塞套介面,以避免由于一個檔案句柄的阻塞讀 / 阻塞寫操作把處理多個檔案描述符的任務餓死,最好以下面的方式呼叫 ET 模式的 epoll 介面,在后面會介紹避免可能的缺陷,
- 基于非阻塞檔案句柄
- 只有當 read(2) 或者 write(2) 回傳 EAGAIN 時才需要掛起,等待,但這并不是說每次 read() 時都需要回圈讀,直到讀到產生一個 EAGAIN 才認為此次事件處理完成,當 read() 回傳的讀到的資料長度小于請求的資料長度時,就可以確定此時緩沖中已沒有資料了,也就可以認為此事讀事件已處理完成,
epoll API
epoll提供的API,我所用過的其實不多,無非就那么幾個,
所以我就只能聊聊我說用過的,
頭檔案
#include<sys/epoll.h>
創建句柄
int epoll_create(int size);
創建一個epoll句柄,引數size用于告訴內核監聽的檔案描述符個數,跟記憶體大小有關,
回傳epoll 檔案描述符
epoll控制函式
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event ); // 成功,返0,失敗返-1
控制某個epoll監控的檔案描述符上的事件:注冊,修改,洗掉
引數釋義:
epfd:為epoll的句柄
op:表示動作,用3個宏來表示
··· EPOLL_CTL_ADD(注冊新的 fd 到epfd)
··· EPOLL_CTL_DEL(從 epfd 中洗掉一個 fd)
··· EPOLL_CTL_MOD(修改已經注冊的 fd 監聽事件)
event:告訴內核需要監聽的事件
typedef union epoll_data
{
void* ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t; /* 保存觸發事件的某個檔案描述符相關的資料 */
struct epoll_event
{
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
/* epoll_event.events:
EPOLLIN 表示對應的檔案描述符可以讀
EPOLLOUT 表示對應的檔案描述符可以寫
EPOLLPRI 表示對應的檔案描述符有緊急的資料可讀
EPOLLERR 表示對應的檔案描述符發生錯誤
EPOLLHUP 表示對應的檔案描述符被掛斷
EPOLLET 表示對應的檔案描述符有事件發生
*/
epoll訊息讀取
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待所監控檔案描述符上有事件的產生
引數釋義:
events:用來從內核得到事件的集合
maxevent:用于告訴內核這個event有多大,這個maxevent不能大于創建句柄時的size
timeout:超時時間
··· -1:阻塞
··· 0:立即回傳
···>0:指定微秒
成功回傳有多少個檔案描述符準備就緒,時間到回傳0,出錯回傳-1.
代碼示例
/*server.c*/
#include<stdio.h>
#include<string.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<errno.h>
#define MAXLINE 80
#define SERV_PORT 8000
#include OPEN_MAX 1024
int main(void)
{
struct sockaddr_in servaddr,cliaddr;
socklen_t cliaddr_len;
int i,j,maxi,listenfd,connfd,sockfd;
int nready,efd,res,client[OPEN_MAX];
ssize_t n;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
struct epoll_event tep,ep[OPEN_MAX];
listenfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htonl(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr));
listen(listenfd,20);
maxfd = listenfd; //初始化
maxi = -1; //client[]的下標
for(i = 0 ; i < OPEN_MAX ; i++ )
client[i] = -1; //用-1初始化client
//套路開始
efd = epoll_create(OPEN_MAX); //創建句柄
if(efd == -1)
perrno("epoll_create");
tep.events = EPOLLIN; //設定讀事件
tep.data.fd = listenfd; //套接socket檔案描述符
res = epoll_ctl(efd,EPOLL_CTL_ADD,listened,&tep); //將listenfd加入監聽檔案表,監聽listenfd的讀取內容
if(res == -1)
perrno("epoll_ctl");
while(1)
{
nready = epoll_wait(efd,ep,OPEN_MAX,-1); //阻塞監聽
if(nready == -1)
perrno("epoll_wait error:");
for(i == 0; i<nready; i++)
{
if(!ep[i].event & EPOLLIN)
continue;
if(ep[i].data.fd == listenfd)
{
//開始接收資料了
printf("Accepting connections··· \n"); //寫完一定要來檢查一下這個換行,一不小心就忘記了
cliaddr_len = sizeof(cliaddr); //這得實時更新
connfd = accept(listenfd,(struct sockaddr *)&cliaddr,&cliaddr_len); //接收連接
printf("Read from %s at port %d \n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));
/*將客戶端的地址讀取到str里面然后列印*/ /*將埠號轉換成整形數輸出*/
for(j = 0;j < OPEN_MAX; j++)
{
if(client[j] < 0)
{
client[j] = connfd; //保存accept回傳的檔案描述符到client【】里
break;
}
if( j == OPEN_MAX )
{
printf("Too many clients \n",stderr);
exit(-1);
}
if(j > maxi)
maxi = j;
}
tep.events = EPOLLIN;
tep.data.fd = connfd;
res = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); //將connfd加入監聽檔案表,監聽connfd的讀取內容
if(res == -1)
perrno("epoll_ctl");
else
{
sockfd = ep[i].data.fd;
n = read(sockfd,buf,MAXLINE);
if(n == 0)
{
for(j = 0 ; j <= maxi ; j++)
{
if(client[i] == sockfd)
{
client[j] = -1;
break;
}
}
res = epoll_ctl(efd,EPOLL_CTL_ADD,sockfd,&tep); //將sockfd加入監聽檔案表,監聽sockfd的讀取內容
if(res == -1)
perrno("epoll_ctl")
close(sockfd);
printf("Client[%d] closed connetion \n",j);
}
else
{
for(j = 0; j<n; j++)
buf[j] = toupper(buf[j]);
write(sockfd,buf,n);
}
}
}
}
}
close(listenfd);
close(efd)
return 0;;
}
整體拔高:高效的并發方式
并發編程的目的是讓程式”同時”執行多個任務,如果程式是計算密集型的,并發編程并沒有什么優勢,反而由于任務的切換使效率降低,但如果程式是I/O密集型的,那就不同了,
并發模式是指I/O處理單元和多個邏輯單元之間協調完成任務的方法,服務器主要有兩種并發編程模式:半同步/半異步(half-sync/half-async)模式和領導者/追隨者(Leader/Followers)模式,
這里講一個“半同步/半異步”,
下面的內容需要有一定的基礎了,小白可以收藏一下以后變強了再看,
半同步/半異步模式
在半同步/半異步模式中,同步執行緒用于處理客戶邏輯,異步執行緒用于處理I/O事件,異步執行緒監聽到客戶請求之后就將其封裝成請求物件并插入到請求佇列中,請求佇列將通知某個作業在同步模式的作業執行緒來讀取并處理該請求物件,

半同步/半反應堆模式(half-sync/half-reactive模式)
半同步/半反應堆模式是半同步/半異步模式的一種變體,
其結構如下圖:

在上圖中,異步執行緒只有一個,由主執行緒充當,負責監聽socket上的事件,如果監聽socket上有新的連接請求到來,主執行緒就接受新的連接socket,然后往epoll內核事件表中注冊該socket上的讀寫事件,如果連接socket上有讀寫事件發生,即有新的客戶請求到來或有資料要發送至客戶端,主執行緒就將該連接socket插入到請求佇列中,所有作業執行緒都睡眠在請求佇列上,當有任務到來時,他們通過競爭來獲取任務的接管權,
由于主執行緒插入請求佇列中的任務是就緒的連接socket,所以該半同步/半反應堆模式所采用的事件處理模式是Reactor模式,即作業執行緒要自己從socket上讀寫資料,當然,半同步/半反應堆模式也可以用模擬的Proactor事件處理模式,即由主執行緒來完成資料的讀寫操作,此時主執行緒將應用程式資料、任務型別等資訊封裝為一個任務物件,然后將其插入到請求佇列,
半同步/半反應堆模式的缺點:
主執行緒和作業執行緒共享請求佇列,因而請求佇列是臨界資源,所以對請求佇列操作的時候需要加鎖保護,
每個作業執行緒在同一時間只能處理一個客戶請求,如果客戶數量增多,則請求佇列中堆積任務太多,客戶端的回應會越來越慢,如果增多作業執行緒的話,則執行緒的切花也將消耗大量的CPU時間,
高效的半同步/半異步模式
在半同步/半反應堆模式中,每個作業執行緒同時只能處理一個客戶請求,如果并發量大的話,客戶端回應會很慢,如果每個作業執行緒都能同時處理多個客戶鏈接,則就能改善這種情況,所以就有了高效的半同步/半異步模式,
其結構如圖:
主執行緒只管監聽socket,當有新的連接socket到來時,主執行緒就接受連接并回傳新的連接socket給某個作業執行緒,此后該新連接socket上的任何I/O操作都由被選中的作業執行緒來處理,直到客戶端關閉連接,當作業執行緒檢測到有新的連接socket到來時,就把該新的連接socket的讀寫事件注冊到自己的epoll內核事件表中,
主執行緒和作業執行緒都維持自己的事件回圈,他們各自獨立的監聽不同事件,因此在這種高效的半同步/半異步模式中,每個執行緒都作業在異步模式中,所以它并非嚴格意義上的半同步/半異步模式,
epoll原始碼學習

資料結構
eventpoll
// epoll的核心實作對應于一個epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在這里
// f_op->poll() 使用的, 被其他事件通知機制利用的wait_address
wait_queue_head_t poll_wait;
//已就緒的需要檢查的epitem 串列
struct list_head rdllist;
//保存所有加入到當前epoll的檔案對應的epitem
struct rb_root rbr;
// 當正在向用戶空間復制資料時, 產生的可用檔案
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
//優化回圈檢查,避免回圈檢查中重復的遍歷
int visited;
struct list_head visited_list_link;
}
epitem
// 對應于一個加入到epoll的檔案
struct epitem {
// 掛載到eventpoll 的紅黑樹節點
struct rb_node rbn;
// 掛載到eventpoll.rdllist 的節點
struct list_head rdllink;
// 連接到ovflist 的指標
struct epitem *next;
/* 檔案描述符資訊fd + file, 紅黑樹的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 當前檔案的等待佇列(eppoll_entry)串列
// 同一個檔案上可能會監視多種事件,
// 這些事件可能屬于不同的wait_queue中
// (取決于對應檔案型別的實作),
// 所以需要使用鏈表
struct list_head pwqlist;
// 當前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 傳入的用戶資料 */
struct epoll_event event;
};
eppoll_entry
// 與一個檔案上的一個wait_queue_head 相關聯,因為同一檔案可能有多個等待的事件,
//這些事件可能使用不同的等待佇列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 添加到wait_queue 中的節點
wait_queue_t wait;
// 檔案wait_queue 頭
wait_queue_head_t *whead;
};
函式介面
epoll_create()
//先進行判斷size是否>=0,若是則直接呼叫epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
SYSCALL_DEFINE1是一個宏,用于定義有一個引數的系統呼叫函式,上述宏展開后即成為: int sys_epoll_create(int size),這就是epoll_create系統呼叫的入口,至于為何要用宏而不是直接宣告,主要是因為系統呼叫的引數個數、傳參方式都有嚴格限制,最多六個引數,
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;//主描述符
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/* 對于epoll來講, 目前唯一有效的flag就是CLOEXEC */
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/* 分配一個struct eventpoll */
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
/* 這里是創建一個匿名fd,
epollfd本身并不存在一個真正的檔案與之對應, 所以內核需要創建一個
"虛擬"的檔案, 并為之分配真正的struct file結構, 而且有真正的fd.
這里2個引數比較關鍵:
eventpoll_fops, fops就是file operations, 就是當你對這個檔案(這里是虛擬的)進行操作(比如讀)時,
fops里面的函式指標指向真正的操作實作, 類似C++里面虛函式和子類的概念.
epoll只實作了poll和release(就是close)操作, 其它檔案系統操作都有VFS全權處理了.
ep, ep就是struct epollevent, 它會作為一個私有資料保存在struct file的private指標里面.
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
}
// epoll 檔案系統的相關實作
// epoll 檔案系統初始化, 在系統啟動時會呼叫
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可添加到epoll的最多的描述符數量
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化遞回檢查佇列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分別用來分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
epoll_ctl()
//創建好epollfd后, 接下來添加fd
//epoll_ctl的引數:epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要監聽的描述符,event 我們感興趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
//錯誤處理以及從用戶空間將epoll_event結構copy到內核空間.
if (ep_op_has_event(op) &&
// 復制用戶空間資料到內核
copy_from_user(&epds, event, sizeof(struct epoll_event))) {
goto error_return;
}
// 取得 epfd 對應的檔案
error = -EBADF;
file = fget(epfd);
if (!file) {
goto error_return;
}
// 取得目標檔案
tfile = fget(fd);
if (!tfile) {
goto error_fput;
}
// 目標檔案必須提供 poll 操作
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll) {
goto error_tgt_fput;
}
// 添加自身或epfd 不是epoll 句柄
error = -EINVAL;
if (file == tfile || !is_file_epoll(file)) {
goto error_tgt_fput;
}
// 取得內部結構eventpoll
ep = file->private_data;
// EPOLL_CTL_MOD 不需要加全域鎖 epmutex
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
// 目標檔案也是epoll 檢測是否有回圈包含的問題
if (ep_loop_check(ep, tfile) != 0) {
goto error_tgt_fput;
}
} else
{
// 將目標檔案添加到 epoll 全域的tfile_check_list 中
list_add(&tfile->f_tfile_llink, &tfile_check_list);
}
}
mutex_lock_nested(&ep->mtx, 0);
// 以tfile 和fd 為key 在rbtree 中查找檔案對應的epitem
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 沒找到, 添加額外添加ERR HUP 事件
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else {
error = -EEXIST;
}
// 清空檔案檢查串列
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi) {
error = ep_remove(ep, epi);
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else {
error = -ENOENT;
}
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (did_lock_epmutex) {
mutex_unlock(&epmutex);
}
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
//ep_insert()在epoll_ctl()中被呼叫, 完成往epollfd里面添加一個監聽fd的作業
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/*
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
*/
// 增加監視檔案數
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches)) {
return -ENOSPC;
}
// 分配初始化 epi
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {
return -ENOMEM;
}
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// 初始化紅黑樹中的key
ep_set_ffd(&epi->ffd, tfile, fd);
// 直接復制用戶結構
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 初始化臨時的 epq
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 設定事件掩碼
epq.pt._key = event->events;
// 內部會呼叫ep_ptable_queue_proc, 在檔案對應的wait queue head 上
// 注冊回呼函式, 并回傳當前檔案的狀態
revents = tfile->f_op->poll(tfile, &epq.pt);
// 檢查錯誤
error = -ENOMEM;
if (epi->nwait < 0) { // f_op->poll 程序出錯
goto error_unregister;
}
// 添加當前的epitem 到檔案的f_ep_links 鏈表
spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
// 插入epi 到rbtree
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check()) {
goto error_remove_epi;
}
spin_lock_irqsave(&ep->lock, flags);
/* 檔案已經就緒插入到就緒鏈表rdllist */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq))
// 通知sys_epoll_wait , 呼叫回呼函式喚醒sys_epoll_wait 行程
{
wake_up_locked(&ep->wq);
}
// 先不通知呼叫eventpoll_poll 的行程
if (waitqueue_active(&ep->poll_wait)) {
pwake++;
}
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
if (pwake)
// 安全通知呼叫eventpoll_poll 的行程
{
ep_poll_safewake(&ep->poll_wait);
}
return 0;
error_remove_epi:
spin_lock(&tfile->f_lock);
// 洗掉檔案上的 epi
if (ep_is_linked(&epi->fllink)) {
list_del_init(&epi->fllink);
}
spin_unlock(&tfile->f_lock);
// 從紅黑樹中洗掉
rb_erase(&epi->rbn, &ep->rbr);
error_unregister:
// 從檔案的wait_queue 中洗掉, 釋放epitem 關聯的所有eppoll_entry
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink)) {
list_del_init(&epi->rdllink);
}
spin_unlock_irqrestore(&ep->lock, flags);
// 釋放epi
kmem_cache_free(epi_cache, epi);
return error;
}
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{
int pollflags;
struct eventpoll *ep = file->private_data;
// 插入到wait_queue
poll_wait(file, &ep->poll_wait, wait);
// 掃描就緒的檔案串列, 呼叫每個檔案上的poll 檢測是否真的就緒,
// 然后復制到用戶空間
// 檔案串列中有可能有epoll檔案, 呼叫poll的時候有可能會產生遞回,
// 呼叫所以用ep_call_nested 包裝一下, 防止死回圈和過深的呼叫
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current);
// static struct nested_calls poll_readywalk_ncalls;
return pollflags != -1 ? pollflags : 0;
}
// 通用的poll_wait 函式, 檔案的f_ops->poll 通常會呼叫此函式
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address) {
// 呼叫_qproc 在wait_address 上添加節點和回呼函式
// 呼叫 poll_table_struct 上的函式指標向wait_address添加節點, 并設定節點的func
// (如果是select或poll 則是 __pollwait, 如果是 epoll 則是 ep_ptable_queue_proc),
p->_qproc(filp, wait_address, p);
}
}
/*
* 該函式在呼叫f_op->poll()時會被呼叫.
* 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關聯起來的.
* 關聯的辦法就是使用等待佇列(waitqueue)
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/* 初始化等待佇列, 指定ep_poll_callback為喚醒時的回呼函式,
* 當我們監聽的fd發生狀態改變時, 也就是佇列頭被喚醒時,
* 指定的回呼函式將會被呼叫. */
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
/* 將剛分配的等待佇列成員加入到頭中, 頭是由fd持有的 */
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
/* nwait記錄了當前epitem加入到了多少個等待佇列中,
* 我認為這個值最大也只會是1... */
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
//回呼函式, 當我們監聽的fd發生狀態改變時, 它會被呼叫.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
//從等待佇列獲取epitem.需要知道哪個行程掛載到這個設備
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;//獲取
spin_lock_irqsave(&ep->lock, flags);
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/* 沒有我們關心的event... */
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/*
* 這里看起來可能有點費解, 其實干的事情比較簡單:
* 如果該callback被呼叫的同時, epoll_wait()已經回傳了,
* 也就是說, 此刻應用程式有可能已經在回圈獲取events,
* 這種情況下, 內核將此刻發生event的epitem用一個單獨的鏈表
* 鏈起來, 不發給應用程式, 也不丟棄, 而是在下一次epoll_wait
* 時回傳給用戶.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
}
goto out_unlock;
}
/* 將當前的epitem放入ready list */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
/* 喚醒epoll_wait... */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
/* 如果epollfd也在被poll, 那就喚醒佇列里面的所有成員. */
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
epoll_wait()
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/* 這個地方有必要說明一下:
* 內核對應用程式采取的策略是"絕對不信任",
* 所以內核跟應用程式之間的資料互動大都是copy, 不允許(也時候也是不能...)指標參考.
* epoll_wait()需要內核回傳資料給用戶空間, 記憶體由用戶程式提供,
* 所以內核會用一些手段來驗證這一段記憶體空間是不是有效的.
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
/* 獲取epollfd的struct file, epollfd也是檔案嘛 */
file = fget(epfd);
if (!file)
goto error_return;
error = -EINVAL;
/* 檢查一下它是不是一個真正的epollfd... */
if (!is_file_epoll(file))
goto error_fput;
/* 獲取eventpoll結構 */
ep = file->private_data;
/* 等待事件到來~~ */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
/* 這個函式真正將執行epoll_wait的行程帶入睡眠狀態... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;//等待佇列
/* 計算睡覺時間, 毫秒要轉換為HZ */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep->lock, flags);
res = 0;
/* 如果ready list不為空, 就不睡了, 直接干活... */
if (list_empty(&ep->rdllist)) {
/* OK, 初始化一個等待佇列, 準備直接把自己掛起,
* 注意current是一個宏, 代表當前行程 */
init_waitqueue_entry(&wait, current);//初始化等待佇列,wait表示當前行程
__add_wait_queue_exclusive(&ep->wq, &wait);//掛載到ep結構的等待佇列
for (;;) {
/* 將當前行程設定位睡眠, 但是可以被信號喚醒的狀態,
* 注意這個設定是"將來時", 我們此刻還沒睡! */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果這個時候, ready list里面有成員了,
* 或者睡眠時間已經過了, 就直接不睡了... */
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/* 如果有信號產生, 也起床... */
if (signal_pending(current)) {
res = -EINTR;
break;
}
/* 啥事都沒有,解鎖, 睡覺... */
spin_unlock_irqrestore(&ep->lock, flags);
/* jtimeout這個時間后, 會被喚醒,
* ep_poll_callback()如果此時被呼叫,
* 那么我們就會直接被喚醒, 不用等時間了...
* 再次強調一下ep_poll_callback()的呼叫時機是由被監聽的fd
* 的具體實作, 比如socket或者某個設備驅動來決定的,
* 因為等待佇列頭是他們持有的, epoll和當前行程
* 只是單純的等待...
**/
jtimeout = schedule_timeout(jtimeout);//睡覺
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
/* OK 我們醒來了... */
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/* 如果一切正常, 有event發生, 就開始準備資料copy給用戶空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
//呼叫p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
//由ep_send_events()呼叫本函式
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
mutex_lock(&ep->mtx);
spin_lock_irqsave(&ep->lock, flags);
/* 這一步要注意, 首先, 所有監聽到events的epitem都鏈到rdllist上了,
* 但是這一步之后, 所有的epitem都轉移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經被清空了! */
list_splice_init(&ep->rdllist, &txlist);
/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 保存后下次再處理... */
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/* 在這個回呼函式里面處理每個epitem
* sproc 就是 ep_send_events_proc, 下面會注釋到. */
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/* 現在我們來處理ovflist, 這些epitem都是我們在傳遞資料給用戶空間時
* 監聽到了事件. */
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/* 將這些直接放入readylist */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
ep->ovflist = EP_UNACTIVE_PTR;
/* 上一次沒有處理完的epitem, 重新插入到ready list */
list_splice(&txlist, &ep->rdllist);
/* ready list不為空, 直接喚醒... */
if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}


轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/258056.html
標籤:其他
