epoll相關視頻決議:
支撐億級io的底層基石 epoll實戰揭秘
linux多執行緒之epoll原理剖析與reactor原理及應用
epoll的網路模型,從redis、memcached到nginx,一起搞定
epoll 是Linux平臺下的一種特有的多路復用IO實作方式,與傳統的 select 相比,epoll 在性能上有很大的提升,
epoll 的創建
要使用 epoll 首先需要呼叫 epoll_create() 函式創建一個 epoll 的句柄,epoll_create() 函式定義如下:
int epoll_create(int size);
引數 size 是由于歷史原因遺留下來的,現在不起作用,當用戶呼叫 epoll_create() 函式時,會進入到內核空間,并且呼叫 sys_epoll_create() 內核函式來創建 epoll 句柄,sys_epoll_create() 函式代碼如下:
asmlinkage long sys_epoll_create(int size)
{
int error, fd = -1;
struct eventpoll *ep;
error = -EINVAL;
if (size <= 0 || (error = ep_alloc(&ep)) < 0) {
fd = error;
goto error_return;
}
fd = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep);
if (fd < 0)
ep_free(ep);
error_return:
return fd;
}
sys_epoll_create() 主要做兩件事情:
- 呼叫 ep_alloc() 函式創建并初始化一個 eventpoll 物件,
- 呼叫 anon_inode_getfd() 函式把 eventpoll 物件映射到一個檔案句柄,并回傳這個檔案句柄,
我們先來看看 eventpoll 這個物件,eventpoll 物件用于管理 epoll 監聽的檔案串列,其定義如下:
struct eventpoll {
...
wait_queue_head_t wq;
...
struct list_head rdllist;
struct rb_root rbr;
...
};
先來說明一下 eventpoll 物件各個成員的作用:
- wq: 等待佇列,當呼叫 epoll_wait(fd) 時會把行程添加到 eventpoll 物件的 wq 等待佇列中,
- rdllist: 保存已經就緒的檔案串列,
- rbr: 使用紅黑樹來管理所有被監聽的檔案,
下圖展示了 eventpoll 物件與被監聽的檔案關系:

由于被監聽的檔案是通過 epitem 物件來管理的,所以上圖中的節點都是以 epitem 物件的形式存在的,為什么要使用紅黑樹來管理被監聽的檔案呢?這是為了能夠通過檔案句柄快速查找到其對應的 epitem 物件,紅黑樹是一種平衡二叉樹,如果對其不了解可以查閱相關的檔案,
向 epoll 添加檔案句柄
前面介紹了怎么創建 epoll,接下來介紹一下怎么向 epoll 添加要監聽的檔案,
通過呼叫 epoll_ctl() 函式可以向 epoll 添加要監聽的檔案,其原型如下:
long epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);
下面說明一下各個引數的作用:
1、epfd: 通過呼叫 epoll_create() 函式回傳的檔案句柄,
2、op: 要進行的操作,有3個選項:
- EPOLL_CTL_ADD:表示要進行添加操作,
- EPOLL_CTL_DEL:表示要進行洗掉操作,
- EPOLL_CTL_MOD:表示要進行修改操作
3、fd: 要監聽的檔案句柄,
4、event: 告訴內核需要監聽什么事,其定義如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events 可以是以下幾個宏的集合:
- EPOLLIN :表示對應的檔案句柄可以讀(包括對端SOCKET正常關閉);
- EPOLLOUT:表示對應的檔案句柄可以寫;
- EPOLLPRI:表示對應的檔案句柄有緊急的資料可讀;
- EPOLLERR:表示對應的檔案句柄發生錯誤;
- EPOLLHUP:表示對應的檔案句柄被掛斷;
- EPOLLET:將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水平觸發(Level Triggered)來說的,
- EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列里,
【文章福利】需要C/C++ Linux服務器架構師學習資料加群812855908(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等)

data 用來保存用戶自定義資料,
epoll_ctl() 函式會呼叫 sys_epoll_ctl() 內核函式,sys_epoll_ctl() 內核函式的實作如下:
asmlinkage long sys_epoll_ctl(int epfd, int op,
int fd, struct epoll_event __user *event)
{
...
file = fget(epfd);
tfile = fget(fd);
...
ep = file->private_data;
mutex_lock(&ep->mtx);
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
...
}
mutex_unlock(&ep->mtx);
...
return error;
}
sys_epoll_ctl() 函式會根據傳入不同 op 的值來進行不同操作,比如傳入 EPOLL_CTL_ADD 表示要進行添加操作,那么就呼叫 ep_insert() 函式來進行添加操作,
我們繼續來分析添加操作 ep_insert() 函式的實作:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
...
error = -ENOMEM;
// 申請一個 epitem 物件
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
goto error_return;
// 初始化 epitem 物件
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
// 等價于: epq.pt->qproc = ep_ptable_queue_proc
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 呼叫被監聽檔案的 poll 介面.
// 這個介面又各自檔案系統實作, 如socket的話, 那么這個介面就是 tcp_poll().
revents = tfile->f_op->poll(tfile, &epq.pt);
...
ep_rbtree_insert(ep, epi); // 把 epitem 物件添加到epoll的紅黑樹中進行管理
spin_lock_irqsave(&ep->lock, flags);
// 如果被監聽的檔案已經可以進行對應的讀寫操作
// 那么就把檔案添加到epoll的就緒佇列 rdllink 中, 并且喚醒呼叫 epoll_wait() 的行程.
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
...
return 0;
...
}
被監聽的檔案是通過 epitem 物件進行管理的,也就是說被監聽的檔案會被封裝成 epitem 物件,然后會被添加到 eventpoll 物件的紅黑樹中進行管理(如上述代碼中的 ep_rbtree_insert(ep, epi)),
tfile->f_op->poll(tfile, &epq.pt) 這行代碼的作用是呼叫被監聽檔案的 poll() 介面,如果被監聽的檔案是一個socket句柄,那么就會呼叫 tcp_poll(),我們來看看 tcp_poll() 做了什么操作:
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
struct sock *sk = sock->sk;
...
poll_wait(file, sk->sk_sleep, wait);
...
return mask;
}
每個 socket 物件都有個等待佇列(waitqueue, 關于等待佇列可以參考文章: 等待佇列原理與實作),用于存放等待 socket 狀態更改的行程,
從上述代碼可以知道,tcp_poll() 呼叫了 poll_wait() 函式,而 poll_wait() 最侄訓呼叫 ep_ptable_queue_proc() 函式,ep_ptable_queue_proc() 函式實作如下:
static void ep_ptable_queue_proc(struct file *file,
wait_queue_head_t *whead, poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
epi->nwait = -1;
}
}
ep_ptable_queue_proc() 函式主要作業是把當前 epitem 物件添加到 socket 物件的等待佇列中,并且設定喚醒函式為 ep_poll_callback(),也就是說,當socket狀態發生變化時,會觸發呼叫 ep_poll_callback() 函式,ep_poll_callback() 函式實作如下:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
...
// 把就緒的檔案添加到就緒佇列中
list_add_tail(&epi->rdllink, &ep->rdllist);
is_linked:
// 喚醒呼叫 epoll_wait() 而被阻塞的行程
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
...
return 1;
}
ep_poll_callback() 函式的主要作業是把就緒的檔案添加到 eventepoll 物件的就緒佇列中,然后喚醒呼叫 epoll_wait() 被阻塞的行程,
等待被監聽的檔案狀態發生改變
把被監聽的檔案句柄添加到epoll后,就可以通過呼叫 epoll_wait() 等待被監聽的檔案狀態發生改變,
epoll_wait() 呼叫會阻塞當前行程,當被監聽的檔案狀態發生改變時,epoll_wait() 呼叫便會回傳,
epoll_wait() 系統呼叫的原型如下:
long epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
各個引數的意義:
- epfd: 呼叫 epoll_create() 函式創建的epoll句柄,
- events: 用來存放就緒檔案串列,
- maxevents: events 陣列的大小,
- timeout: 設定等待的超時時間,
epoll_wait() 函式會呼叫 sys_epoll_wait() 內核函式,而 sys_epoll_wait() 函式最侄訓呼叫 ep_poll() 函式,我們來看看 ep_poll() 函式的實作:
static int ep_poll(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents, long timeout)
{
...
// 如果就緒檔案串列為空
if (list_empty(&ep->rdllist)) {
// 把當前行程添加到epoll的等待佇列中
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE); // 把當前行程設定為睡眠狀態
if (!list_empty(&ep->rdllist) || !jtimeout) // 如果有就緒檔案或者超時, 退出回圈
break;
if (signal_pending(current)) { // 接收到信號也要退出
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout); // 讓出CPU, 切換到其他行程進行執行
spin_lock_irqsave(&ep->lock, flags);
}
// 有3種情況會執行到這里:
// 1. 被監聽的檔案集合中有就緒的檔案
// 2. 設定了超時時間并且超時了
// 3. 接收到信號
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
/* 是否有就緒的檔案? */
eavail = !list_empty(&ep->rdllist);
spin_unlock_irqrestore(&ep->lock, flags);
if (!res && eavail
&& !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
ep_poll() 函式主要做以下幾件事:
1、判斷被監聽的檔案集合中是否有就緒的檔案,如果有就回傳,
2、如果沒有就把當前行程添加到epoll的等待佇列中,并且進入睡眠,
3、行程會一直睡眠直到有以下幾種情況發生:
- 被監聽的檔案集合中有就緒的檔案
- 設定了超時時間并且超時了
- 接收到信號
4、如果有就緒的檔案,那么就呼叫 ep_send_events() 函式把就緒檔案復制到 events 引數中,
5、回傳就緒檔案的個數,
最后,我們通過一張圖來總結epoll的原理:

下面通過文字來描述一下這個程序:
- 通過呼叫 epoll_create() 函式創建并初始化一個 eventpoll 物件,
- 通過呼叫 epoll_ctl() 函式把被監聽的檔案句柄 (如socket句柄) 封裝成 epitem 物件并且添加到
eventpoll 物件的紅黑樹中進行管理, - 通過呼叫 epoll_wait() 函式等待被監聽的檔案狀態發生改變,
- 當被監聽的檔案狀態發生改變時(如socket接收到資料),會把檔案句柄對應 epitem 物件添加到 eventpoll 物件的就緒佇列 rdllist 中,并且把就緒佇列的檔案串列復制到 epoll_wait() 函式的 events 引數中,
- 喚醒呼叫 epoll_wait() 函式被阻塞(睡眠)的行程,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/254097.html
標籤:其他
