2.使用I/O復用技術和執行緒池
網路中有很多用戶會嘗試去connect()這個WebServer上正在listen的這個port,而監聽到的這些連接會排隊等待被accept(),由于用戶連接請求是隨機到達的異步事件,每當監聽socket(listenfd)listen到新的客戶連接并且放入監聽佇列,我們都需要告訴Web服務器有連接來了,accept這個連接,并分配一個邏輯單元來處理這個用戶請求,而且,我們在處理這個請求的同時,還需要繼續監聽其他客戶的請求并分配另一邏輯單元來處理新的用戶請求(即并發,同時處理多個事件,后面會使用執行緒池實作并發),
?
這里,服務器通過epoll這種I/O復用技術來實作對監聽socket(listenfd)和連接socket(客戶請求)的同時監聽,I/O復用雖然可以同時監聽多個檔案描述符,但是它本身是阻塞的,并且當有多個檔案描述符同時就緒的時候,如果不采取額外措施,程式則只能按順序處理其中就緒的每一個檔案描述符,所以為提高效率,我們將在這部分通過執行緒池來實作并發(多執行緒并發),為每個就緒的檔案描述符分配一個邏輯單元(執行緒)來處理,
代碼塊
//對檔案描述符設定非阻塞
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/* 將fd上的EPOLLIN和EPOLLET事件注冊到epollfd指示的epoll內核事件中 */
void addfd(int epollfd, int fd, bool one_shot) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
/* 針對connfd,開啟EPOLLONESHOT,因為我們希望每個socket在任意時刻都只被一個執行緒處理 */
if(one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
/* 創建一個額外的檔案描述符來唯一標識內核中的epoll事件表 */
int epollfd = epoll_create(5);
/* 用于存盤epoll事件表中就緒事件的event陣列 */
epoll_event events[MAX_EVENT_NUMBER];
/* 主執行緒往epoll內核事件表中注冊監聽socket事件,當listen到新的客戶連接時,listenfd變為就緒事件 */
addfd(epollfd, listenfd, false);
/* 主執行緒呼叫epoll_wait等待一組檔案描述符上的事件,并將當前所有就緒的epoll_event復制到events陣列中 */
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
/* 然后我們遍歷這一陣列以處理這些已經就緒的事件 */
for(int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd; // 事件表中就緒的socket檔案描述符
if(sockfd == listenfd) { // 當listen到新的用戶連接,listenfd上則產生就緒事件
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
/* ET模式 */
while(1) {
/* accept()回傳一個新的socket檔案描述符用于send()和recv() */
int connfd = accept(listenfd, (struct sockaddr *) &client_address, &client_addrlength);
/* 并將connfd注冊到內核事件表中,users是 http_conn 型別的陣列 */
users[connfd].init(connfd, client_address);
/*
...
*/
}
}
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 如有例外,則直接關閉客戶連接,并洗掉該用戶的timer
/*
...
*/
}
else if(events[i].events & EPOLLIN) {
/* 當這一sockfd上有可讀事件時,epoll_wait通知主執行緒,*/
if(users[sockfd].read()) { /* 主執行緒從這一sockfd回圈讀取資料, 直到沒有更多資料可讀 */
pool->append(users + sockfd); /* 然后將讀取到的資料封裝成一個請求物件并插入請求佇列 */
/*
...
*/
}
else
/*
...
*/
}
else if(events[i].events & EPOLLOUT) {
/* 當這一sockfd上有可寫事件時,epoll_wait通知主執行緒,主執行緒往socket上寫入服務器處理客戶請求的結果 */
if(users[sockfd].write()) {
/*
...
*/
}
else
/*
...
*/
}
}
accept函式
服務器通過 accept() 函式來接收客戶端請求,
函式原型如下:
int accept(int sock, struct sockaddr *addr, socklen_t *addrlen)
sock 為服務器端套接字,addr 為 sockaddr_in 結構體變數,addrlen 為引數 addr 的長度,可由 sizeof() 求得,
?
accept() 回傳一個新的套接字來和客戶端通信,addr 保存了客戶端的IP地址和埠號,后面和客戶端通信時,要使用這個新生成的套接字,
I/O復用
I/O復用使得程式能同時監聽多個檔案描述符,通常,網路程式在以下情況需要使用I/O復用技術:
- 客戶端程式要同時處理多個socket,
- 客戶端程式要同時處理用戶輸入和網路連接,
- TCP服務器要同時處理監聽socket和連接socket,這是I/O復用使用最多的場合,
- 服務器要同時處理TCP請求和UDP請求,
- 服務器要同時監聽多個埠,或處理多種服務,
注意:I/O復用本身是阻塞的,
?
Linux下實作I/O復用的系統呼叫主要有select、poll、epoll,
select
int select ( int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout )
1)nfds引數指定被監聽的檔案描述符的總數,通常設定為被監聽的所有檔案描述符中的最大值加1,因為檔案描述符是從0開始計數的,
2)readfds、writefds、exceptfds引數分別指向可讀、可寫和例外等事件對應的檔案描述符集合,
3)timeout引數用來設定select函式的超時時間,
回傳值:成功時回傳就緒檔案描述符的總數,超時回傳0,出錯回傳-1并設定errno,
poll
int poll ( struct pollfd * fds, nfds_t nfds, int timeout )
1)fds引數是一個pollfd結構型別的陣列,它指定所有我們感興趣的檔案描述符上發生的可讀、可寫、例外等事件,
2)nfds引數指定被監聽事件集合fds的大小,
3)timeout引數指定poll超時值,單位是毫秒,當timeout值為-1時,poll呼叫將永遠阻塞,直到某個事件發生;當timeout值為0時,poll呼叫將立即回傳,
poll回傳值和select一樣,
epoll
epoll是Linux特有的I/O復用函式,epoll使用一組函式(共三個函式)完成任務,把用戶關心的檔案描述符上的事件放在內核里的一個事件表中,無需像select和poll那樣每次呼叫都要重復傳入檔案描述符或事件集,但epoll需要一個額外的檔案描述符,來唯一標識內核中的這個事件表,這個檔案描述符使用epoll_create函式來創建:
int epoll_create ( int sieze )
size引數只是告訴內核這個epoll物件會處理的事件大致數目,而不是能夠處理事件的最大數目,即size引數沒有任何作用,
回傳值:成功:epoll 專用的檔案描述符;失敗:-1,
注意:使用完epoll后,必須呼叫close()關閉,否則可能導致fd被耗盡,
?
操作epoll的內核事件表的函式:
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event )
epfd參數即epoll句柄(使用epoll_create函式回傳的檔案描述符),op引數表示動作,用三個宏來表示:
EPOLL_CTL_ADD:注冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中洗掉一個fd;
fd引數指需要監聽的fd,event引數告訴內核需要監聽什么事,struct epoll_event結構如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
?
event可以是以下幾個宏的集合:
EPOLLIN :表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的檔案描述符可以寫;
EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀(這里應該表示有帶外資料到來);
EPOLLERR:表示對應的檔案描述符發生錯誤;
EPOLLHUP:表示對應的檔案描述符被掛斷;
EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水平觸發(Level Triggered)來說的,
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列里
例如將event設定為 讀 和 ET模式 事件的集合:ev.events = EPOLLIN | EPOLLET;
回傳值:epoll_ctl成功時回傳0,失敗回傳-1并設定errno,
注意:它不同于 select() 是在監聽事件時告訴內核要監聽什么型別的事件,而是在這里先注冊要監聽的事件型別,
?
epoll_wait函式,等侍注冊在epfd上的socket fd的事件的發生,其原型如下:
int epoll_wait( int epfd, struct epoll_event * events, int maxevents, int timeout )
1)epfd是 epoll的描述符,
2)events則是分配好的 epoll_event結構體陣列,epoll將會把發生的事件復制到 events陣列中(events不可以是空指標,內核只負責把資料復制到這個 events陣列中,不會去幫助我們在用戶態中分配記憶體,內核這種做法效率很高),
3)maxevents表示本次可以回傳的最大事件數目,通常 maxevents引數與預分配的events陣列的大小是相等的,
4)timeout表示在沒有檢測到事件發生時最多等待的時間(單位為毫秒),如果 timeout為0,則表示 epoll_wait在 rdllist鏈表中為空,立刻回傳,不會等待,
回傳值:該函式成功時回傳就緒的檔案描述符的個數,失敗時回傳-1并設定errno,
注意:如果有事件的發生則會將發生的socket fd和事件型別放入到events陣列中,并將注冊在epfd上的socket fd的事件型別給清空,如果下一個回圈還要關注這個socket fd的話,則需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)來重新設定socket fd的事件型別,這時不用EPOLL_CTL_ADD,因為socket fd并未清空,只是事件型別清空,
?
epoll對檔案描述符的操作有兩種模式:LT(Level Trigger,電平觸發)模式和 ET(Edge Trigger,邊沿觸發)模式,
LT模式是默認的作業模式,這種模式下epoll相當于一個效率較高的poll,當epoll_wait檢測到其上有事件發生并將此事件通知應用程式后,應用程式可以不立即處理該事件,這樣,當應用程式下一次呼叫epoll_wait時,epoll_wait還會再次向應用程式通知此事件,直到此事件被處理,
ET模式下,當epoll_wait檢測到其上有事件發生并將此事件通知應用程式后,應用程式必須立即處理該事件,因為后續的epoll_wait不再向應用程式通知這一事件,ET模式在很大程度上降低了同一個epoll事件被重復觸發的次數,因此效率要比LT模式高,需要往epoll內核事件表中注冊一個檔案描述符上的EPOLLET事件epoll才能變為ET模式,
注意:每個使用ET模式的檔案描述符都應該是非阻塞的,如果是阻塞的,那么讀或寫操作會因為沒有后續的事件而一直處于阻塞狀態,
?
即使我們使用ET模式,一個socket上的某個事件還是可能被觸發多次,這在并發的程式中會導致多個執行緒(或行程)同時操作一個socket的情況出現,例如一個執行緒在讀取完某個socket上的資料后開始處理這些資料,而在處理這些資料的程序中該socket又有新資料可讀(EPOLLIN 再次被觸發),此時程式會喚醒另一個執行緒來讀取這些新的資料,這并不是我們期望的,這會使程式的健壯性大大降低而編程的復雜度大大增加,我們期望的是一個socket連接在任意時刻都只被一個執行緒處理,這就可以使用epoll的 EPOLLONESHOT 事件實作,
對于注冊了 EPOLLONESHOT 事件的檔案描述符,作業系統最多觸發其上注冊的一個可讀、可寫、或者例外事件,且只觸發一次,這樣,當一個執行緒在處理某個socket時,其他執行緒是不可能有機會操作該socket的,但反過來,注冊了 EPOLLONESHOT 事件的socket一旦被某個執行緒處理完畢,該執行緒就應該立即重置這個socket上的 EPOLLONESHOT 事件,以確保這個socket下一次可讀時,其 EPOLLIN 事件能被觸發,讓其他作業執行緒有機會繼續處理這個socket,
三組I/O復用函式的比較
| 系統呼叫 | 事件集合 | 應用程式索引就緒檔案描述符的時間復雜度 | 最大支持檔案描述符數 | 作業模式 | 內核實作和作業效率 |
|---|---|---|---|---|---|
| select | 用戶通過3個引數分別傳入感興趣的可讀、可寫及例外等事件,內核通過對這些引數的在線修改來反饋其中的就緒事件,這使得用戶每次呼叫select都要重置這3個引數 | O(n) | 一般有最大限制 | LT | 采用輪詢的方式來檢測就緒事件,演算法復雜度為O(n) |
| poll | 統一處理所有事件型別,因此只需一個事件集引數,用戶通過pollfd.events傳入感興趣的事,內核通過修改pollfd.revents反饋其中就緒的事件 | O(n) | 65535 | LT | 采用輪詢的方式來檢測就緒事件,演算法復雜度為O(n) |
| epoll | 內核通過一個事件表直接管理用戶感興趣的所有事件,因此每次呼叫epoll_wait時,無需反復傳入用戶感興趣的事件,epoll_wait的引數events僅用來反饋就緒的事件 | O(1) | 65535 | LT 或 ET | 采用回呼方式來檢測就緒事件,演算法復雜度為O(1) |
綜上,當監測的fd數量較小,且各個fd都很活躍的情況下,建議使用select和poll;當監聽的fd數量較多,且單位時間區域分fd活躍的情況下,使用epoll會明顯提升性能,
?
多執行緒編程
創建執行緒和結束執行緒
執行緒相關常用的API如下(在Linux系統上都定義在pthread.h頭檔案中):
- pthread_create
用于創建一個執行緒,定義如下:
int pthread_create (pthread_t* thread, const pthread_attr_t* attr, void* (start_routine)( void ), void* arg)
1)thread引數是新執行緒的識別符號,其他執行緒相關函式通過它來參考新執行緒,其是一個整形型別,在Linux上幾乎所有的資源識別符號都是一個整型數,比如socket,
2)attr引數用于設定新執行緒的屬性,給它傳 NULL 值時表示使用默認執行緒屬性,
3)start_routine和arg引數分別指定執行緒將運行的函式及其引數,如果引數不止一個,需要將引數寫到一個結構體中,再將該結構體的地址作為引數傳入,
回傳值:成功時回傳0,失敗時回傳錯誤碼,
注意:
- 執行緒數量受資源限制是有限的,執行緒總數不能超過內核引數所定義的值,
- 傳入start_routine引數的函式要求為靜態函式,
要在靜態函式中使用類的動態成員有兩種方法:
- 通過類的靜態物件來呼叫
- 將類的物件作為引數傳遞給該靜態函式
- pthread_exit
執行緒函式在結束時最好呼叫此函式,以確保安全、干凈地退出,因為默認屬性的執行緒執行結束后并不會立即釋放占用的資源,直到整個行程執行結束,所有執行緒的資源以及整個行程占用的資源才會被作業系統回收,其函式原型如下:
void pthread_exit ( void* retval )
此函式通過 retval 引數向執行緒的回收者傳遞其退出資訊,如果執行緒不需要回傳任何資料,將 retval 引數置為 NULL 即可,
它執行完后不會回傳到呼叫者,而且用于不會失敗,
- pthread_join
一個行程中的所有執行緒都可以呼叫此函式來回收其他執行緒(前提是目標執行緒是可回收的),即等待其他執行緒結束,其定義如下:
int pthread_join( pthread_t thread, void retval );
thread引數是目標執行緒的識別符號,retval則是目標執行緒回傳的退出資訊,該函式會一直阻塞**,直到被回收的執行緒結束為止,
回傳值:成功時回傳0,失敗則回傳錯誤碼,
可能的錯誤碼如下:
(1) EDEADLK:可能引起死鎖,例如兩個執行緒互相join等待對方
(2) EINVAL:目標執行緒不可回收,或者有其他執行緒正在join等待本執行緒
(3) ESRCH:執行緒不存在
- pthread_cancel
可用此函式向另一個執行緒發送“終止執行”的信號(后續稱“Cancel”信號),從而令目標執行緒結束執行,函式原型如下:
int pthread_cancel(pthread_t pthread)
引數為目標執行緒的識別符號,
回傳值:成功回傳0,失敗則回傳錯誤碼,
注意: 函式的功能僅僅是向目標執行緒發送 Cancel 信號,至于目標執行緒是否處理該信號以及何時結束執行,由目標執行緒決定,
?
接收到取消信號的目標執行緒可以決定是否允許被取消以及如何取消,這分別由以下兩個函式完成(成功時都回傳 0):
int pthread_setcancelstate(int state, int *oldstate)
int pthread_setcanceltype(int type, int *oldtype)
這兩個引數的第一個引數分別用于設定執行緒的取消狀態(是否允許取消)和取消型別(如何取消),第二個引數則分別記錄執行緒原來的取消狀態和取消型別,state引數有兩個可選值:
- PTHREAD_CANCEL_ENABLE:允許執行緒被取消,是執行緒創建時的默認狀態
- PTHREAD_CANCEL_DISABLE:禁止執行緒被取消,這種情況下的執行緒收到取消請求,則它會將請求掛機,直到該執行緒允許被取消,
type引數也有兩個可選值:
- PTHREAD_CANCEL_DEFERRED:執行緒隨時都可以被取消,它將使得收到取消請求的目標執行緒立即采取行動,
- PTHREAD_CANCEL_ASYNCHRONOUS:允許目標執行緒推遲行動,直到它呼叫了下面幾個所謂的取消點函式中的一個,pthread_join、pthread_testcancel、pthread_cond_wait、pthread_cond_timedwait、sem_wait、sigwait、read、wait等,不過為了安全,最好在可能被取消的代碼中呼叫 pthread_testcancel 函式以設定取消點,
執行緒結束執行的方式共有 3 種,分別是:
- 執行緒將指定函式體中的代碼執行完后自行結束,
- 執行緒執行程序中,遇到 pthread_exit() 函式結束執行,
- 執行緒執行程序中,被同一行程中的其它執行緒(包括主執行緒)強制終止,
第一種很容易理解,第二種和第三種方式我們將分別舉例給大家演示用法,
pthread_exit() 函式的用法:
#include <stdio.h>
#include <pthread.h>
//執行緒要執行的函式,arg 用來接收執行緒傳遞過來的資料
void *ThreadFun(void *arg)
{
//終止執行緒的執行,將“https://www.cnblogs.com/zyzhi”回傳
pthread_exit("https://www.cnblogs.com/zyzhi"); //回傳的字串存盤在常量區,并非當前執行緒的私有資源
printf("*****************");//此陳述句不會被執行緒執行
}
int main()
{
int res;
//創建一個空指標
void * thread_result;
//定義一個表示執行緒的變數
pthread_t myThread;
res = pthread_create(&myThread, NULL, ThreadFun, NULL);
if (res != 0) {
printf("執行緒創建失敗");
return 0;
}
//等待 myThread 執行緒執行完成,并用 thread_result 指標接收該執行緒的回傳值
res = pthread_join(myThread, &thread_result);
if (res != 0) {
printf("等待執行緒失敗");
}
printf("%s", (char*)thread_result);
//輸出結果為 https://www.cnblogs.com/zyzhi
return 0;
}
第三種方法是指一個執行緒可以借助 pthread_cancel() 函式向另一個執行緒發送“終止執行”的信號,從而令目標執行緒結束執行,對于接收 Cancel 信號后結束執行的目標執行緒,等同于該執行緒自己執行如下陳述句:
pthread_exit(PTHREAD_CANCELED);
PTHREAD_CANCELED是一種宏(定義在<pthread.h>頭檔案中)
pthread_cancel() 函式的用法:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h> // sleep() 函式
//執行緒執行的函式
void * thread_Fun(void * arg) {
printf("新建執行緒開始執行\n");
sleep(10);
}
int main()
{
pthread_t myThread;
void * mess;
int value;
int res;
//創建 myThread 執行緒
res = pthread_create(&myThread, NULL, thread_Fun, NULL);
if (res != 0) {
printf("執行緒創建失敗\n");
return 0;
}
sleep(1);
//向 myThread 執行緒發送 Cancel 信號
res = pthread_cancel(myThread);
if (res != 0) {
printf("終止 myThread 執行緒失敗\n");
return 0;
}
//獲取已終止執行緒的回傳值
res = pthread_join(myThread, &mess);
if (res != 0) {
printf("等待執行緒失敗\n");
return 0;
}
//如果執行緒被強制終止,其回傳值為 PTHREAD_CANCELED
if (mess == PTHREAD_CANCELED) {
printf("myThread 執行緒被強制終止\n");
}
else {
printf("error\n");
}
return 0;
}
/*
最后輸出:
新建執行緒開始執行
myThread 執行緒被強制終止
*/
執行緒分離
執行緒分為兩種狀態:可結合態分離態
- 可結合態(執行緒默認狀態)
在此狀態下的執行緒能夠被其他執行緒回收資源或殺死,在被其他執行緒回收前,其占有的存盤器資源不會釋放,
- 分離態
這種狀態下的執行緒不能被其他執行緒回識訓殺死,它的存盤器資源在它終止時由系統自動釋放,
可以使用執行緒分離函式將執行緒變為分離態:
int pthread_detach( pthread_t thread)
回傳值:成功時回傳0,失敗回傳-1
POSIX 信號量
多執行緒程式必須考慮同步問題,pthread_join 可以看作一種簡單的執行緒同步方式,但它無法高效地實作復雜的同步需求,比如控制對共享資源的獨占式訪問,所以我們需要學習 3 種專門用于執行緒同步的機制:POSIX信號量、互斥量、條件變數,
常用的 POSIX 信號量函式有以下 5 個,都定義在 semaphore.h 中:
- sem_init
int sem_init( sem_t *sem, int pshared, unsigned int value )
用于初始化一個未命名的信號量
引數:
1)sem:要初始化的信號量
2)pshared:指定信號量的型別,如果為 0,表示這個信號量是當前行程的區域信號量,否則該信號量就可以在多個行程間共享
3)value:指定信號量的初始值
注意:初始化一個已經被初始化的信號量將導致不可預期的結果
- sem_destroy
int sem_destroy( sem_t *sem )
用于銷毀一個信號量
注意:銷毀一個正在被其他執行緒等待的信號量將導致不可預期的結果
- sem_wait
int sem_wait( sem_t *sem )
以原子操作的方式將信號量的值 -1
如果信號量的值為 0,則 sem_wait 將被阻塞直到信號量有非 0 值
- sem_trywait
int sem_trywait( sem_t *sem )
以原子操作的方式將信號量的值 -1,它會立即回傳(相當于 sem_wait 的非阻塞版本)
信號量為 0 時會回傳 -1 并設定 errno 為 EAGAIN
- sem_post
int sem_post( sem_t *sem )
以原子操作的方式將信號量的值 +1
當信號量的值 > 0 時,其他正在呼叫 sem_wait 等待信號量的執行緒將被喚醒
這5個函式成功時回傳 0,失敗則回傳-1并設定errno,
互斥量
互斥量(互斥鎖)可以保護關鍵代碼,以確保其獨占式的訪問,
POSIX互斥鎖的相關函式主要有如下 5個,都定義在 pthread.h 中:
- pthread_mutex_init
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
用于初始化互斥鎖,
這些函式的mutex引數都指向要操作的目標互斥鎖,mutexattr引數指定互斥鎖的屬性,為NULL時表示使用默認屬性,
還可以使用如下方式來初始化一個互斥鎖:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- pthread_mutex_lock
int pthread_mutex_lock(pthread_mutex_t *mutex)
以原子方式給一個互斥鎖加鎖,如果目標互斥鎖已經被鎖上,則將阻塞,直到該互斥鎖的占有者將其解鎖,
- pthread_mutex_trylock
int pthread_mutex_trylock(pthread_mutex_t *mutex)
與 pthread_mutex_lock 類似(相當于 pthread_mutex_lock 的非阻塞版),始終立即回傳,當目標鎖已經被加鎖時,將回傳錯誤碼EBUSY,
- pthread_mutex_unlock
int pthread_mutex_unlock(pthread_mutex_t *mutex)
以原子方式給一個互斥鎖解鎖,如果此時有其他執行緒正在等待這個互斥鎖,則這些執行緒中的某一個將獲得它,
- pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex)
用于銷毀互斥鎖,以釋放其占用的內核資源,銷毀一個已經加鎖的互斥鎖將導致不可預期的后果,
上面這些函式成功時回傳 0,失敗則回傳錯誤碼,
條件變數
如果說互斥鎖是用于同步執行緒對共享資料的訪問的話,那么條件變數則是用于在執行緒之間同步共享資料的值,假設一個行程中包含多個執行緒,這些執行緒共享變數 x,我們希望某個(或某些)執行緒等待 “x==10” 條件成立后再執行后續的代碼,就可以使用條件變數來實作,
條件變數提供了一種通知機制:當某個共享資料達到某個值的時候,喚醒等待這個共享資料的執行緒,
為了避免多執行緒之間發生“搶奪資源”的問題,條件變數在使用程序中必須和一個互斥鎖搭配使用,
條件變數用 pthread_cond_t 型別的變數表示,條件變數的相關函式主要有以下幾個,都定義在 pthread.h 中:
- 初始化條件變數
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr)
?
引數 cond 用于指明要初始化的條件變數;引數 attr 用于自定義條件變數的屬性,通常我們將它賦值為 NULL,表示以系統默認的屬性完成初始化操作,
?
當使用默認屬性去初始化時,也可以用如下方法完成初始化:
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER
- 阻塞當前執行緒,等待條件成立
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime)
?
cond 引數表示已初始化好的條件變數;mutex 引數表示與條件變數配合使用的互斥鎖;abstime 引數表示阻塞執行緒的時間,
注意:abstime 引數指的是絕對時間,如果要阻塞執行緒 5 秒鐘,就需要用獲得的當前系統的時間去加上 5 秒,最終得到的時間才是傳遞的實參值,
?
呼叫兩個函式之前,我們必須先創建好一個互斥鎖并完成 加鎖 操作,然后才能作為實參傳遞給 mutex 引數,兩個函式會完成以下兩項作業:
- 阻塞執行緒,直至接收到“條件成立”的信號
- 當執行緒被添加到等待佇列上時,將互斥鎖 解鎖
注意:當函式接收到“條件成立”的信號后,它并不會立即結束對執行緒的阻塞,而是先完成對互斥鎖的“加鎖”操作,然后才解除阻塞,
?
兩個函式的區別在于:
- pthread_cond_wait() 函式可以永久阻塞執行緒,直到條件變數成立的那一刻
- pthread_cond_timedwait() 函式只能在 abstime 引數指定的時間內阻塞執行緒,超出時限后,該函式將重新對互斥鎖執行“加鎖”操作,并解除對執行緒的阻塞,函式的回傳值為 ETIMEDOUT,
- 解除執行緒的阻塞狀態
int pthread_cond_signal(pthread_cond_t* cond)
int pthread_cond_broadcast(pthread_cond_t* cond)
cond 引數表示初始化好的條件變數
兩個函式都能解除執行緒的“被阻塞”狀態,區別在于:
- pthread_cond_signal() 函式至少解除一個執行緒的“被阻塞”狀態,如果等待佇列中包含多個執行緒,優先解除哪個執行緒將由作業系統的執行緒調度程式決定
- pthread_cond_broadcast() 函式可以解除等待佇列中所有執行緒的“被阻塞”狀態,
由于互斥鎖的存在,解除阻塞后的執行緒也不一定能立即執行,當互斥鎖處于“加鎖”狀態時,解除阻塞狀態的所有執行緒會組成等待互斥鎖資源的佇列,等待互斥鎖“解鎖”,
- 銷毀條件變數
int pthread_cond_destroy(pthread_cond_t *cond)
cond 引數表示要銷毀的條件變數
注意:銷毀后的條件變數還可以呼叫 pthread_cond_init() 函式重新初始化后使用,
以上函式成功時都回傳0,失敗則回傳錯誤碼,
執行緒同步機制包裝成類
為了充分復用代碼,將上面的 3 種執行緒同步機制分別封裝成 3 個類,實作在 locker.h 頭檔案中,
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
class sem
{
public:
sem()
{
if (sem_init(&m_sem, 0, 0) != 0)
{
throw std::exception();
}
}
sem(int num)
{
if (sem_init(&m_sem, 0, num) != 0)
{
throw std::exception();
}
}
~sem()
{
sem_destroy(&m_sem);
}
bool wait()
{
return sem_wait(&m_sem) == 0;
}
bool post()
{
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
class locker
{
public:
locker()
{
if (pthread_mutex_init(&m_mutex, NULL) != 0)
{
throw std::exception();
}
}
~locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool lock()
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock()
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get()
{
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
class cond
{
public:
cond()
{
if (pthread_cond_init(&m_cond, NULL) != 0)
{
//pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond()
{
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t *m_mutex)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast()
{
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
執行緒池
執行緒池一種執行緒使用模式,執行緒池維護著多個執行緒,等待著監督管理者分配可并發執行的任務,這避免了在處理短時間任務時創建與銷毀執行緒的代價,執行緒池不僅能夠保證內核的充分利用,還能防止過分調度,
?
執行緒池的組成部分有:
- 執行緒池管理器:創建和初始化執行緒,啟動和停止執行緒,調配任務;管理執行緒池
- 作業執行緒:執行緒池中的執行緒
- 任務介面:添加任務的介面,以提供作業執行緒調度任務的執行,
- 任務佇列:用于存放沒有處理的任務,提供一種緩沖機制,同時具有調度功能,高優先級的任務放在佇列前面
執行緒池中執行緒數量
執行緒池中的執行緒數量最直接的限制因素是中央處理器(CPU)的處理器(processors/cores)的數量N:如果你的CPU是4-cores的,對于CPU密集型的任務(如視頻剪輯等消耗CPU計算資源的任務)來說,那執行緒池中的執行緒數量最好也設定為4(或者+1防止其他因素造成的執行緒阻塞);對于IO密集型的任務,一般要多于CPU的核數,因為執行緒間競爭的不是CPU的計算資源而是IO,IO的處理一般較慢,多于cores數的執行緒將為CPU爭取更多的任務,不至在執行緒處理IO的程序造成CPU空閑導致資源浪費,
公式:最佳執行緒數 = CPU當前可使用的Cores數 * 當前CPU的利用率 * (1 + CPU等待時間 / CPU處理時間)
本專案采用的是半同步/半反應堆執行緒池,將執行緒池代碼封裝在 threadpool.h 頭檔案中
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
/* 參考上面的執行緒同步機制的包裝類 */
#include "../lock/locker.h"
/* 執行緒池類,定義為模板方便復用 */
template <typename T>
class threadpool
{
public:
/* thread_number是執行緒池中執行緒數量,max_request是請求佇列中最多允許的、等待處理的請求數量 */
threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
/* 往請求佇列中添加任務 */
bool append(T *request);
private:
/* 作業執行緒運行的函式,它不斷從作業佇列中取出任務并執行 */
static void *worker(void *arg);
void run();
private:
int m_thread_number; // 執行緒池中的執行緒數
int m_max_requests; // 請求佇列中允許的最大請求數
pthread_t *m_threads; // 描述執行緒池的陣列,其大小為 m_thread_number
std::list<T *> m_workqueue; // 請求佇列
locker m_queuelocker; // 保護請求佇列的互斥鎖
sem m_queuestat; // 是否有任務需要處理
bool m_stop; // 是否結束執行緒
connection_pool *m_connPool; //資料庫
};
template <typename T>
threadpool<T>::threadpool( connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL),m_connPool(connPool)
{
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
//printf("create the %dth thread\n",i);
/* 因為需要在靜態函式中使用類的動態成員,故將類的物件作為引數闖入 */
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads;
throw std::exception();
}
/* 將執行緒設定為分離態 */
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
template <typename T>
threadpool<T>::~threadpool()
{
delete[] m_threads;
m_stop = true;
}
template <typename T>
bool threadpool<T>::append(T *request)
{
/* 操作作業佇列時需要加鎖,因為它被所以執行緒共享 */
m_queuelocker.lock();
if (m_workqueue.size() > m_max_requests)
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <typename T>
void threadpool<T>::run()
{
while (!m_stop)
{
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)
continue;
//從連接池中取出一個資料庫連接
request->mysql = m_connPool->GetConnection();
//process(模板類中的方法,這里是http類)進行處理
request->process();
//將資料庫連接放回連接池
m_connPool->ReleaseConnection(request->mysql);
}
}
#endif
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/544578.html
標籤:其他
上一篇:Python關于例外處理的教程
下一篇:Java+Jquer實作趨勢圖

