執行緒池
- 1. 執行緒池的概念
- 2. Linux下執行緒池的CPP模擬實作
1. 執行緒池的概念
- 執行緒池的本質:一個
執行緒安全的佇列+ 一堆執行緒 - 執行緒安全佇列中的元素型別: 型別 =
資料+該資料處理方式
圖解

2. Linux下執行緒池的CPP模擬實作
-
將
資料和處理該資料的方法封裝成一個資料類,并提供一個run方法,呼叫資料處理方法處理資料 -
主執行緒負責向執行緒池的安全佇列中插入資料 -
執行緒池中的作業執行緒負責從安全佇列中拿出資料,并用資料類提供的run方法,讓資料自己處理自己
#include <stdio.h>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include <vector>
using namespace std;
//函式指標的宏定義
typedef void (*Handler)(int);
template<class T>
class QueueData{
public:
//傳入資料和處理資料的函式
QueueData(T data_,Handler handler_)
{
data = data_;
handler = handler_;
}
~QueueData(){}
//呼叫run函式來使用傳入的函式處理傳入的資料
void run()
{
handler(data);
}
private:
T data; //資料
Handler handler; //處理資料的方法
};
template <class T>
class ThreadPool{
public:
//初始化執行緒池 并按照傳入 執行緒數量 創建 相應個數的執行緒
ThreadPool(int Capacity,int Thread_count)
{
//初始化 佇列容量+執行緒數量+互斥鎖+條件變數+退出標志
capacity = Capacity;
thread_count = Thread_count;
lock = PTHREAD_MUTEX_INITIALIZER;
consumer_cond = PTHREAD_COND_INITIALIZER;
flag = false;
//創建執行緒
pthread_t ptid;
for(int i = 0; i < thread_count; i++)
{
int ret = pthread_create(&ptid,NULL,PoolStart,(void*)this);
if(ret < 0)
perror("pthread_create");
}
}
~ThreadPool()
{
// 銷毀 條件變數+互斥鎖
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&consumer_cond);
}
//主執行緒呼叫 向佇列中插入資料
void Push(QueueData<T>* qd)
{
pthread_mutex_lock(&lock);
if(flag) //
{
pthread_mutex_unlock(&lock);
return;
}
safe_queue.push(qd);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&consumer_cond); //提醒PCB等待佇列中的執行緒可以來取資料啦
}
//呼叫該函式表示資料處理完了 可以退出所有執行緒了
void ThreadExit()
{
pthread_mutex_lock(&lock);
flag = true;
pthread_mutex_unlock(&lock);
pthread_cond_broadcast(&consumer_cond);
}
private:
queue<QueueData<T>* > safe_queue; //佇列
size_t capacity; //安全佇列中最大容量
pthread_mutex_t lock; //保證主執行緒與消費執行緒之間互斥
pthread_cond_t consumer_cond; //保證消費執行緒之間同步
size_t thread_count; //執行緒池中的執行緒數量
bool flag; //執行緒是否可以退出的標志
//作業執行緒呼叫
void Pop(QueueData<T>** qd)
{
(*qd) = safe_queue.front(); //將安全佇列中的資料放入引數中
safe_queue.pop(); //將隊首元素彈出
}
//執行緒入口函式中呼叫Pop函式,將this指標傳入進來
static void* PoolStart(void* arg)
{
//執行緒分離,執行緒退出后OS自動回收其資源
pthread_detach(pthread_self());
//將void*型別的引數強轉成ThreadPool*
ThreadPool* p = (ThreadPool*)arg;
//作業執行緒的任務就是取出資料并處理資料
while(1)
{
//為了保證執行緒能夠獨占式訪問共享資源,須加鎖
pthread_mutex_lock(&p->lock);
while(p->safe_queue.empty())
{
if(p->flag) //當flag為true時表示資料處理完了 該執行緒可以退出了
{
(p->thread_count)--;
pthread_mutex_unlock(&p->lock);
pthread_exit(NULL);
}
pthread_cond_wait(&p->consumer_cond,&p->lock); //當佇列空就等待并釋放鎖
}
QueueData<T>* qd;
p->Pop(&qd); //從安全佇列中拿出來資料
//先釋放鎖 再 處理資料,反之,會導致處理完資料再釋放鎖,釋放鎖時間過長
pthread_mutex_unlock(&p->lock);
qd->run();
}
}
};
//處理資料的函式
void DealData(int data)
{
printf("%d\n",data);
}
int main()
{
//創建執行緒池,設定安全佇列中最多可以保存4個元素,執行緒數量規定為4個
ThreadPool<int>* tp = new ThreadPool<int>(4,4);
if(!tp) return -1;
//假設需要處理 100 個資料
for(int i = 0; i < 100; i++)
{
QueueData<int>* qd = new QueueData<int>(i,DealData);
if(!qd) continue;
tp->Push(qd);
}
sleep(3);
//退出執行緒池
tp->ThreadExit();
return 0;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/248047.html
標籤:其他
上一篇:sql server 2017資料庫復習:第十章-安全管理
下一篇:網路安全從零開始(基礎知識)
