執行緒池在實際的服務器開發是非常重要的一環,他涉及的概念也比較多,例如執行緒的使用,互斥鎖,條件變數,信號量的創建使用時機等等,同時你還要知道它如何自動銷毀和創建,實作一個較為智能的模式,
本文對執行緒池的一種構建方式進行詳細分解解讀注釋,但也有許多需要改進的地方,
完整的代碼文中已經給出,如需整個測驗專案,私信發,
目錄鏈接
文章目錄
- 1 為什么要epoll創建一個執行緒池
- 2 執行緒池的實作流程
- 3 準備作業,封裝互斥鎖和條件變數
- 4 執行緒池的實作
- 4.1 結構說明(重)
- 4.1.1 任務結構體
- 4.1.2 執行緒池結構體
- 4.1.3 四個函式
- 4.2 threadpool_add_task函式實作說明
- 4.2.1 偽代碼(詳細中文說明)
- 4.2.2 具體代碼實作
- 4.3 thread_routine函式實作說明
- 4.3.1 偽代碼(詳細中文說明)
- 4.3.2 具體代碼實作
- 4.4 threadpool_init函式實作說明
- 4.5 threadpool_destroy函式實作說明
- 5 測驗代碼
- 5.1 main.c
- 5.2 client.c
- 5.3 測驗結果
- 補充
- 總結
1 為什么要epoll創建一個執行緒池
- 降低資源消耗,通過重復利用已創建的執行緒降低執行緒的創建和銷毀造成的消耗,
- 提高回應速度,當任務到達時,任務可以不需要等到執行緒創建就能立即執行,
- 提高執行緒的可管理性,執行緒為稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控,但是,要做到合理利用執行緒池,必須對其實作原理了如指掌,
2 執行緒池的實作流程
- 創建一個執行緒池,初始化其中屬性,可創建一定數量執行緒,放入佇列,或者不初始化
- 執行緒都處于阻塞等待狀態,不占用cpu,當超時等待,可以自己結束執行緒
- 當需要執行函式,則把函式包裝成任務,并把任務傳入空閑執行緒進行運行
- 當沒有空閑行程且小于最大執行緒數要求,則創建執行緒執行任務
- 執行完任務的執行緒不需要退出,阻塞等待即可(或者設定等待時間)
- 若有執行緒池銷毀通知,確保任務執行完退出銷毀
3 準備作業,封裝互斥鎖和條件變數
其實這邊的封裝就是為了方便使用,我們把信號量和互斥鎖封裝成一個結構體是有很大幫助的,
在實際使用中我們都知道實作互斥訪問(不懂可以參考生產者和消費者模型)其中一種方式就是條件變數和互斥鎖的組合使用,
多個執行緒可以理解成多個消費者,所以設計到很多共享資源,需要互斥鎖,而條件變數是為了方便通知執行緒有可以執行的任務了,
condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
//結構體內放了一個鎖和條件變數
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
} condition_t;
//初始化鎖和條件變數
int condition_init(condition_t *cond);
//上鎖
int condition_lock(condition_t *cond);
//解鎖
int condition_unlock(condition_t *cond);
//阻塞等待喚醒
int condition_wait(condition_t *cond);
//設定時間等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
//隨機喚醒一個阻塞的
int condition_signal(condition_t *cond);
//廣播
int condition_broadcast(condition_t *cond);
//銷毀條件變數和互斥鎖
int condition_destroy(condition_t *cond);
#endif /* _CONDITION_H_ */
# condition.c
#include "condition.h"
int condition_init(condition_t *cond)
{
int status;
if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if ((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t* cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t* cond)
{
int status;
if ((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if ((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
4 執行緒池的實作
4.1 結構說明(重)
執行緒池主要組成就三個部分
- task_t型別的結構體
- threadpool_t型別結構體
- 四個實作函式
4.1.1 任務結構體
任務結構體簡單的說就是對實際執行函式的封裝
特別注意一下第一個成員變數,其實就是
函式指標,函式指標 的本質是一個指標,該指標的地址指向了一個函式,所以它是指向函式的指標,簡單的說:理解成一個未初始化的函式變數
把函式指標和引數分開作為引數其實就是為了在呼叫pthread_create方便傳入引數,第三個成員和鏈表的下一個指標域一個意思,我們這邊吧任務串成鏈表,
typedef struct task
{
// 任務回呼函式
// 簡單的說就是函式指標
void *(*run)(void *arg);
// 回呼函式引數
void *arg;
struct task *next;//指向的下一個結構體指標
} task_t;
4.1.2 執行緒池結構體
定義那么多關于執行緒的變數其實都是為了方便我們去管理執行緒,
唯一需要注意的是任務佇列的頭指標和尾指標,因為定義了這個,我們可以通過執行緒池物件成員很方便的放入或者取出任務,重要
typedef struct threadpool
{
condition_t ready; //初始化了一個條件變數和互斥鎖
task_t *first; //任務佇列頭指標
task_t *last; //任務佇列尾指標
int counter; //執行緒池中當前執行緒數
int idle; //執行緒池中當前正在等待任務的執行緒數
int max_threads; //執行緒池中最大允許的執行緒數
int quit; //銷毀執行緒池的時候置1
} threadpool_t;
4.1.3 四個函式
大致進行了一個介紹,具體解釋說明看后面,
// 初始化執行緒池
void threadpool_init(threadpool_t *pool, int threads);
// 往執行緒池中添加任務,同時喚醒執行緒(或者創造執行緒)去執行
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
// 銷毀執行緒池
void threadpool_destroy(threadpool_t *pool);
//執行緒所執行的那個函式 (注意:并不是任務里面的那個函式)
//它的任務就是去判斷有沒有需要執行的任務,有的話就取出執行,否則等待or退出
void *thread_routine(void *arg);
4.2 threadpool_add_task函式實作說明
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
作用:
往執行緒池中添加任務(封裝的函式)(其實就是任務鏈表中添加任務,并且喚醒等待執行緒去執行 如果沒有空閑的執行緒并且小于限制的最大執行緒數 就去創建執行緒),這里的任務佇列的頭部指標和尾部指標在執行緒池都有定義 掛上去就對了
- 引數一 執行緒池物件
- 引數二 執行緒執行的具體函式
- 引數二 具體函式的引數
對照偽代碼和具體代碼看
4.2.1 偽代碼(詳細中文說明)
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
/*
{
malloc一個任務空間把傳入的執行的函式 和 引數掛上去
上鎖
if(如果頭指標為空)
{
任務掛頭
}
else
{
否則掛到尾巴的下一個
并把當前任務作為鏈表的最后一個
}
if(有空閑的執行緒)
{
就去喚醒一個
}
else if(沒有空閑執行緒 并且存貨執行緒數不多于最大值)
{
這部分有很大的改進空間,每次創建一個沒必要
創建一個執行緒,執行thread_routine
這邊的thread_routine就是自己去取任務執行
執行緒數增加
}
解鎖
}
*/
4.2.2 具體代碼實作
// 往執行緒池中添加任務
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
// 生成新任務
task_t *newtask = (task_t *)malloc(sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next = NULL;
condition_lock(&pool->ready);
// 將任務添加到佇列
if (pool->first == NULL)
pool->first = newtask;
else
pool->last->next = newtask;
pool->last = newtask;
// 如果有等待執行緒,則喚醒其中一個
if (pool->idle > 0)
condition_signal(&pool->ready);
else if (pool->counter < pool->max_threads)
{
// 沒有等待執行緒,并且當前執行緒數不超過最大執行緒數,則創建一個新執行緒
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
}
condition_unlock(&pool->ready);
}
4.3 thread_routine函式實作說明
void *thread_routine(void *arg);
作用:
從鏈表取出(任務)(結構體)(函式)進行執行
- 引數一 執行緒池物件
對照偽代碼和具體代碼看
4.3.1 偽代碼(詳細中文說明)
void *thread_routine(void *arg);
//執行緒呼叫的那個函式
//作用就是:從鏈表取出(任務)(結構體)(函式)進行執行
//引數就是 傳入的執行緒物件 因為需要執行緒池物件的互斥鎖 條件變數 任務指標 這些東西
//具體 while(1)大回圈
/*
while(1)
{
上鎖
while(如果沒有任務 并且執行緒池不要求銷毀)
{
設定時間等待
超時退出,但是只是退出第一層,所以這邊還設定了timeout標志 準備二次退出
}
if(有任務)
{
我們就從任務頭指標去取任務,更改頭指標
解鎖
釋放結構體空間
加鎖 (這邊加鎖 是應為下面有涉及執行緒數的加減操作)
}
if(沒有任務了 并且 執行緒池要求銷毀)
{
執行緒數--
if(執行緒數==0)也就是說其他執行緒任務都結束了 那就
{
喚醒在等待摧毀的函式
跳出回圈
}
釋放鎖
}
//這一個和第二個while是關聯的
if(超時了就是從第二個while跳出了的 并且 依舊沒有任務了)
{
執行緒數減少
釋放鎖
}
}
*/
4.3.2 具體代碼實作
void *thread_routine(void *arg)
{
struct timespec abstime;
int timeout;
printf("thread 0x%x is starting\n", (int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while (1)
{
timeout = 0;
condition_lock(&pool->ready);
pool->idle++;
// 等待佇列有任務到來或者執行緒池銷毀通知
while (pool->first == NULL && !pool->quit)
{
printf("thread 0x%x is waiting\n", (int)pthread_self());
//condition_wait(&pool->ready);
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += 2;
int status = condition_timedwait(&pool->ready, &abstime);
if (status == ETIMEDOUT)
{
printf("thread 0x%x is wait timed out\n", (int)pthread_self());
timeout = 1;
break;
}
}
// 等待到條件,處于作業狀態
pool->idle--;
// 等待到任務
if (pool->first != NULL)
{
// 從隊頭取出任務
task_t *t = pool->first;
pool->first = t->next;
// 執行任務需要一定的時間,所以要先解鎖,以便生產者行程
// 能夠往佇列中添加任務,其它消費者執行緒能夠進入等待任務
condition_unlock(&pool->ready);
t->run(t->arg);
free(t);
condition_lock(&pool->ready);
}
// 如果等待到執行緒池銷毀通知, 且任務都執行完畢
if (pool->quit && pool->first == NULL)
{
pool->counter--;
if (pool->counter == 0)
condition_signal(&pool->ready);
condition_unlock(&pool->ready);
// 跳出回圈之前要記得解鎖
break;
}
if (timeout && pool->first == NULL)
{
pool->counter--;
condition_unlock(&pool->ready);
// 跳出回圈之前要記得解鎖
break;
}
condition_unlock(&pool->ready);
}
printf("thread 0x%x is exting\n", (int)pthread_self());
return NULL;
}
一開始弄錯的是執行緒執行的函式和實際我們要執行的函式不一樣,可以理解成一個嵌套,
4.4 threadpool_init函式實作說明
void threadpool_init(threadpool_t *pool, int threads)
{
// 對執行緒池中的各個欄位初始化
condition_init(&pool->ready);
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads;
pool->quit = 0;
}
4.5 threadpool_destroy函式實作說明
簡單的說就是廣播通知所有執行緒執行完任務,然后執行緒退出
// 銷毀執行緒池
void threadpool_destroy(threadpool_t *pool)
{
if (pool->quit)
{
return;
}
condition_lock(&pool->ready);
pool->quit = 1;
if (pool->counter > 0)
{
if (pool->idle > 0)
condition_broadcast(&pool->ready);
// 處于執行任務狀態中的執行緒,不會收到廣播
// 執行緒池需要等待執行任務狀態中的執行緒全部退出
while (pool->counter > 0)
condition_wait(&pool->ready);
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}
5 測驗代碼
這邊通信方式用的是共享記憶體,設計了一個簡單的結構體,test 函式是我們具體要做的事,這邊做的就是簡單的列印而已,實際上我們可能對客戶端發送來的資訊做更多的處理,
5.1 main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <ctype.h>
#include <fcntl.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <signal.h>
#include <pthread.h>
#include "threadpool.h"
#define PORT 8000
#define OPEN_MAX 1024
struct shared{
int written; // 作為一個標志,非0:表示可讀,0:表示可寫
char text[OPEN_MAX]; // 記錄寫入 和 讀取 的文本
};
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t pid[5];
void *Test(void* arg)
{
struct shared* shm_shared = (struct shared*)arg;
while(1)
{
if(shm_shared->written == 1)
{
printf("read: %s\n",shm_shared->text);
// write(5,shm_shared->text,);
shm_shared->written = 0;
break;
}
}
return NULL;
}
int main()
{
int i;
struct shared *shm_shared;
//創建一個執行緒池
threadpool_t pool;
threadpool_init(&pool, 3);
//創建共享記憶體
int shmid;
void* shmadd;
//struct shared *shm_shared;
if((shmid = shmget(IPC_PRIVATE,10,IPC_CREAT|0666)) < 0)
{
perror("shmget\n");
exit(-1);
}
printf("創建的共享記憶體為:%d\n",shmid);
//掛載共享記憶體到行程內,成功回傳共享記憶體的起始地址
if((shmadd = shmat(shmid,NULL,0)) < (char*)0)
{
perror("shmat");
exit(-1);
}
shm_shared = (struct shared*)shmadd;
shm_shared->written = 0;
//創建套接字,監聽檔案描述符
int sockfd;
struct sockaddr_in seraddr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
//設定非阻塞
int flags = fcntl(sockfd,F_GETFL);
fcntl(sockfd,F_SETFL,flags | O_NONBLOCK);
//初始化埠和IP
bzero(&seraddr,sizeof(seraddr));
seraddr.sin_family = AF_INET;
seraddr.sin_addr.s_addr = htonl(INADDR_ANY);
seraddr.sin_port = htons(PORT);
int ret;
int on;
ret = setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,(const char*)&on,sizeof(on));
if (ret == -1)
perror("bind");
//系結服務器
ret = bind(sockfd,(struct sockaddr*)&seraddr,sizeof(seraddr));
if(ret == -1){
perror("bind");
}
//監聽套接字
if(listen(sockfd,SOMAXCONN) < 0)
{
perror("listen");
}
int client[OPEN_MAX];
int efd;
struct epoll_event event,events[OPEN_MAX];
//將客戶端標識初始化為-1
for(i = 0; i<OPEN_MAX; i++)
{
client[i] = -1;
}
//監聽事件個數
efd = epoll_create(OPEN_MAX);
if(efd == -1)
{
perror("epoll_create");
}
event.events = EPOLLIN; //監聽檔案描述符的可讀事件
event.data.fd = sockfd; //設定監聽的檔案描述符
ret = epoll_ctl(efd,EPOLL_CTL_ADD,sockfd,&event);
if(ret == -1)
{
perror("epoll_ctl");
}
socklen_t len;
int confd,nready;
struct sockaddr_in cliaddr;
char buf[OPEN_MAX] = {0};
while(1)
{
printf("wait....\n");
nready = epoll_wait(efd,events,OPEN_MAX,-1);
for(i = 0; i<nready;i++)
{
if(events[i].data.fd == sockfd)
{
len = sizeof(cliaddr);
confd = accept(sockfd,(struct sockaddr*)&cliaddr,&len);
printf("ip = %s;port = %d\n",inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));
event.data.fd = confd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(efd,EPOLL_CTL_ADD,confd,&event);
//設定為非阻塞模式
//flags = fcntl(confd,F_GETFL);
//fcntl(confd, F_SETFL, flags | O_NONBLOCK);
}
else
{
confd = events[i].data.fd;
printf("connfd=%d\n",confd);
int nread = read(confd,buf,sizeof(buf));
if(nread == 0)
{
close(confd);
printf("client close\n");
event = events[i];
epoll_ctl(efd,EPOLL_CTL_DEL,confd,&event);
//break;
}
else{
//將資料寫入共享記憶體中
strcpy(shm_shared->text,buf);
shm_shared->written = 1;
//添加一個任務
threadpool_add_task(&pool, Test, shm_shared);
memset(buf,0x0,OPEN_MAX);
}
}
}
}
//釋放共享記憶體
shmdt(shmadd);
close(sockfd);
close(efd);
return 0;
}
5.2 client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define SERV_PORT 8000
#define OPEN_MAX 1024
int main(int arg,char* argv[])
{
struct sockaddr_in addr;
int sockfd ,id;
sockfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&addr,sizeof(addr));
addr.sin_family = AF_INET;
inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr);
addr.sin_port = htons(SERV_PORT);
//連接服務器
connect(sockfd,(struct sockaddr*)&addr,sizeof(addr));
char wbuf[OPEN_MAX] = {0};
char rbuf[OPEN_MAX] = {0};
while(fgets(wbuf,sizeof(wbuf),stdin) != NULL)
{
//通過sockfd給服務器發送資料
write(sockfd,wbuf,strlen(wbuf));
/*
id = read(sockfd,rbuf,sizeof(rbuf));
if(id == 0)
{
printf("the other side has been closed\n");
}
*/
fputs(rbuf,stdout);
memset(wbuf,0,1024);
memset(rbuf,0,1024);
}
close(sockfd);
return 0;
}
5.3 測驗結果


補充
- 一個容易弄錯的就是實際執行的函式和執行緒執行的函式是不同的,二者是嵌套關系,
總結
如有錯誤,歡迎指出,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/279843.html
標籤:其他
上一篇:Java程式員需要什么學歷?作業如何選擇?未來咋樣?
下一篇:OpenCV呼叫海康威視等攝像頭(處理rtsp視頻流)方法以及,出現記憶體溢位(error while decoding)或者高延遲問題解決

