Linux 多行程
- 1 Linux 執行緒控制
- 1.1 創建執行緒
- 1.2 執行緒終止和執行緒等待
- 1.2.1 執行緒終止的方式
- 1.2.2 執行緒等待的方式
- 1.2.3 小結
- 1.3 執行緒分離
- 2 Linux 執行緒互斥(重點)
- 2.1 行程、執行緒間的互斥相關背景概念
- 2.2 互斥量 mutex
- 3 可重入和執行緒安全
- 3.1 概念
- 3.2 常見的執行緒不安全的情況
- 3.3 常見的執行緒安全的情況
- 3.4 常見不可重入的情況
- 3.5 常見可重入的情況
- 3.6 可重入與執行緒安全的聯系和區別
- 4 常見的鎖概念
- 4.1 什么是死鎖
- 4.2 產生死鎖的4個必要條件
- 4.3 如何避免死鎖
- 5 Linux 執行緒同步(重點)
- 5.1 什么是執行緒同步
- 5.2 為什么需要執行緒同步
- 5.3 如何編碼實作
- 6 生產者消費者模型
- 6.1 什么是生產者消費者模型
- 6.2 為什么需要生產者消費者模型
- 6.3 生產者消費者模型的優點
- 6.4 基于BlockingQueue的生產者消費者模型的代碼實作
- 6.5 POSIX 信號量的引入
- 6.5.1 什么是信號量?
- 6.5.2 為什么使用信號量:使用信號量有什么好處?
- 6.5.3 如何使用信號量
- 6.6 基于環形佇列的生產消費模型
- 7 執行緒池
- 7.1 什么是執行緒池
- 7.2 執行緒池的應用場景
- 7.3 執行緒池的好處
- 7.4 模擬實作執行緒池
- 7.5 執行緒池VS行程池
1 Linux 執行緒控制
在上一篇文章里提高,Linux 并沒有像 win 那樣真正意義上的執行緒,而是用行程去模擬執行緒的,所以 Linux 中的執行緒創建等一系列的操作由 NPTL POSIX執行緒庫實作,
- 與執行緒有關的函式構成了一個完整的系列,絕大多數函式的名字都是以"pthread_"打頭的
- 要使用這些函式庫,要通過引入頭文<pthread.h>
- 鏈接這些執行緒函式庫時要使用編譯器命令的"-lpthread"選項

1.1 創建執行緒
介面介紹

- 功能:創建一個新的行程
- 引數:thread: 回傳執行緒ID,attr:設定執行緒的屬性,attr 為 NULL 表示使用默認屬性,start_routine 是函式指標,指向執行緒啟動后要執行的函式代碼塊,arg : 傳給執行緒啟動函式的引數
- 回傳值:成功回傳0,失敗回傳錯誤碼
代碼演示



執行緒ID及行程地址空間布局
- pthread_create 函式會產生一個執行緒ID(tid),存放在第一個引數指向的地址中,該執行緒ID,和LWP并不是一個
- 前面所說的LWP,是屬于執行緒調度的范疇,因為執行緒是輕量級行程,是作業系統調度的最小單位,所以需要一個數值來唯一表示該行程,
- phread_create 函式第一個引數指向的虛擬記憶體單元,該記憶體單元的地址就是新創建執行緒的執行緒ID,屬于NPTL執行緒庫的范疇,執行緒庫的后序操作,就是根據該執行緒ID來操作執行緒的,前面的代碼和后面的代碼都有所體現,
獲得執行緒ID的介面:pthread_self()

1.2 執行緒終止和執行緒等待
- main 函式所在的執行緒是主執行緒,主執行緒結束退出,則執行緒所在的行程就會結束,則其他執行緒也會隨之退出,因為行程是承擔系統資源分配的基本單位,行程退出了,那么基于這個行程資源所創建的行程肯定就沒有了,
- 那么新執行緒終止 了,主執行緒如何知道你已經終止了呢,這就需要執行緒等待,如果沒有執行緒等待,也會發生像僵尸行程那樣的問題,造成資源泄露,主執行緒在進行等待的時候會阻塞,還有一種方式叫執行緒分離也可以解決這個問題,
- 執行緒出現例外會導致執行緒所在的行程退出,那么處理這個情況就是行程的問題,所以我們默認執行緒退出只有兩個情況1、代碼跑完結果正確 2、代碼跑完結果不正確
- 信號是專門為行程設計的,信號處理的基本單位是行程,所以block表(信號屏蔽字)是執行緒私有的,但是 pending(未決表)是行程私有的
1.2.1 執行緒終止的方式
注意:在執行緒中呼叫 exit 也是終止該行程所在的行程,想要單獨終止行程有3種方式
- 從執行緒函式 return ,這個方法對主執行緒不適用,從 main 函式 return 相當于呼叫 exit,
- 執行緒可以呼叫 pthread_exit 來終止自己
- 一個執行緒可以呼叫 pthread_cancel 來 終止同一行程中的另一個執行緒,
pthread_exit介紹

需要注意,pthread_exit或者return回傳的指標所指向的記憶體單元必須是全域的或者是用malloc分配的,不能在執行緒函式的堆疊上分配,因為當其它執行緒得到這個回傳指標時執行緒函式已經退出了,
pthread_cancel

代碼演示

結果


1.2.2 執行緒等待的方式
為什么需要執行緒等待呢?
已經退出的執行緒,其空間沒有被釋放,仍然在該行程的地址空間內,
創建的新行程不會復用新行程的地址空間,造成資源泄露,
函式介面介紹 pthread_join

- 引數 thread:執行緒ID(tid) retval :它指向一個指標,后者指向執行緒的回傳值
- 回傳值:成功回傳 0 ,失敗回傳錯誤碼,
1.2.3 小結
- 如果 thread 執行緒通過 return 回傳 ,retval 所指向的單元存放的是thread執行緒函式的回傳值,
- 如果 thread 執行緒被別的執行緒呼叫 pthread_cancel終止的,retval 所指向的單元存放的是常數 PATHREAD_CANCELED(-1)
- 如果 thread 執行緒是自己呼叫 pthread_exit 終止的,retval 所指向的單元存放的是傳給 pthread_exit的引數,
- 如果對 thread 執行緒的終止狀態不感興趣,可以傳NULL給value_ ptr引數,
1.3 執行緒分離
默認情況下,新創建的執行緒是需要被等待的,新執行緒退出后,需要對其進行 pthread_join 操作,否則無法釋放資源,從而造成系統泄露,
如果不關心執行緒的回傳值,join是一種負擔,這個時候我們可以告訴系統,當先執行緒退出的時候,自動釋放執行緒的資源,需要進行執行緒分離,
介面 int pthread_detach(pthread_t thread)
- 可以是執行緒組內其他執行緒對目標執行緒進行分離,也可以是執行緒自己分離
- joinable 和 分離是沖突的,一個執行緒不能既是 joinable 又是分離的,


2 Linux 執行緒互斥(重點)
2.1 行程、執行緒間的互斥相關背景概念
- 臨界資源:多個執行緒執行流共享的資源叫做臨界資源(多個執行緒可能會同時訪問的資源)
- 臨界區:每個執行緒內部,訪問臨界資源的代碼,就叫做臨界區
- 互斥:任何時刻,互斥保證有且只有一個執行流進入進入臨界區,訪問臨界資源,對臨界資源起保護作用,
- 原子性:不被任何調度機制打斷的操作,該操作只有兩態,要么完成要么未完成
1、所有執行緒都必須遵守:對臨界區進行保護
2、lock(加鎖) ===> 訪問臨界區 ==> unlock(解鎖)
3、所有的執行緒都必須先看到同一把鎖,鎖本身也是臨界資源,申請鎖的程序也是兩態的,即lock具有原子性,unlock 也具有原子性,
4、lock > 訪問臨界區(占用一定的時間處理)=> unlock ,在特定執行緒或者行程擁有鎖的時候,期間有新執行緒來申請鎖,一定是申請不到的!那個新執行緒將阻塞,將新執行緒/行程 對應的 PCB 投入到等待佇列中,特定的 執行緒或者行程 unlock 之后,進行執行緒或行程的喚醒操作!
5、一次保證只有一個執行緒進入臨界區,訪問臨界資源,就叫做互斥,
2.2 互斥量 mutex
我們看一個簡單的搶票代碼:4個執行緒搶票


結果卻出現了,車票代碼為負數的情況

決議原因
-
大部分情況執行緒使用的資料都是區域變數,變數的地址空間在執行緒堆疊空間內,這種情況,變數歸屬單個行程,其他執行緒無法獲得這種變數,
-
但是有時候許多變數需要在執行緒間共享,這樣的變數稱為共享變數,通過資料的共享完成執行緒之間的互動,
-
多個執行緒并發的操作共享變數,會帶來一些問題,比如上面的搶票代碼,
-
那么具體到這個搶票代碼的問題,我們來分析一下:
-
if 陳述句判斷條件為真以后,代碼可以并發的切換到其他執行緒
-
usleep 這個模擬漫長業務的程序,在這個漫長的業務程序中,可能有很多個執行緒會進入該代碼段
-
–ticket 操作本身就不是一個原子操作

- 前置–操作并不是原子操作,而是對應三潭訓編指令:
- load :將共享變數ticket從記憶體加載到暫存器中 update : 更新暫存器里面的值,執行-1操作 store :將新值,從暫存器寫回共享變數ticket的記憶體地址
要解決以上問題需要做到以下3點
- 代碼必須要有互斥行為:當代碼進入臨界區執行時,不允許其他執行緒進入該臨界區,
- 如果多個執行緒同時要求執行臨界區的代碼,并且臨界區沒有執行緒在執行,那么只能允許一個執行緒進入該臨界區,
- 如果執行緒不在臨界區中執行,那么該執行緒不能阻止其他執行緒進入臨界區,
要做到這三點,本質上就是需要一把鎖,Linux上提供的這把鎖叫互斥量,

關于互斥量的介面
1、初始化與銷毀互斥量

需要注意的地方
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要銷毀
- 不要銷毀一個已經加鎖的互斥量
- 已經銷毀的互斥量,要確保后面不會有執行緒再嘗試加鎖
2、互斥量加鎖與解鎖

需要注意的地方
- 回傳值:成功回傳 0,失敗回傳錯誤碼
- 互斥量處于未鎖狀態,該函式會將互斥量鎖定,同時回傳成功
- 發起函式呼叫時,其他執行緒已經鎖定互斥量,或者存在其他執行緒同時申請互斥量,但沒有競爭到互斥量,那么pthread_lock呼叫會陷入阻塞(執行流被掛起),等待互斥量解鎖,
用 mutex 互斥量優化上面的搶票代碼


3 可重入和執行緒安全
3.1 概念
- 執行緒安全:多個執行緒并發同一段代碼時,不會出現不同的結果,
- 重入:同一個函式被不同的執行流呼叫,當前一個流程還沒有執行完,就有其他的執行流再次進入,我們稱之為重入,一個函式在重入的情況下,運行結果不會出現任何不同或者任何問題,則該函式被稱為可重入函式,否則,是不可重入函式
3.2 常見的執行緒不安全的情況
- 不保護共享變數的函式
- 函式狀態隨著被呼叫,狀態發生變化的函式
- 回傳指向靜態變數指標的函式
- 呼叫執行緒不安全函式的函式
3.3 常見的執行緒安全的情況
- 每個執行緒對全域變數或者靜態變數只有讀取的權限,而沒有寫入的權限,一般來說這些執行緒是安全的
- 類或者介面對于執行緒來說都是原子操作
- 多個執行緒之間的切換不會導致該介面的執行結果存在二義性
3.4 常見不可重入的情況
- 呼叫了malloc/free函式,因為malloc函式是用全域鏈表來管理堆的
- 呼叫了標準I/O庫函式,標準I/O庫的很多實作都以不可重入的方式使用全域資料結構
- 函式體內使用了靜態的資料結構
3.5 常見可重入的情況
- 不使用全域變數或靜態變數
- 不使用用malloc或者new開辟出的空間
- 不呼叫不可重入函式
- 不回傳靜態或全域資料,所有資料都有函式的呼叫者提供
- 使用本地資料,或者通過制作全域資料的本地拷貝來保護全域資料
3.6 可重入與執行緒安全的聯系和區別
聯系
- 函式是可重入的,那就是執行緒安全的
- 函式是不可重入的,那就不能由多個執行緒使用,有可能引發執行緒安全問題
- 如果一個函式中有全域變數,那么這個函式既不是執行緒安全也不是可重入的
區別
- 可重入函式是執行緒安全函式的一種
- 執行緒安全不一定是可重入的,而可重入函式則一定是執行緒安全的,
- 如果將對臨界資源的訪問加上鎖,則這個函式是執行緒安全的,但如果這個重入函式若鎖還未釋放則會產生死鎖,因此是不可重入的,
4 常見的鎖概念
4.1 什么是死鎖
- 死鎖是指在一組行程中的各個行程均占有不會釋放的資源,但因互相申請被其他行程所站用不會釋放的資源而處于的一種永久等待狀態,
4.2 產生死鎖的4個必要條件
- 互斥條件:一個資源每次只能被一個執行流使用
- 請求與保持條件:一個執行流因請求資源而阻塞時,并且對已獲得的資源保持不放
- 不剝奪條件:一個執行流已獲得的資源,在末使用完之前,不能強行剝奪
- 回圈等待條件:若干執行流之間形成一種頭尾相接的回圈等待資源的關系
4.3 如何避免死鎖
- 破壞死鎖的四個必要條件
- 加鎖順序一致
- 避免鎖未釋放的場景
- 資源一次性分配
避免死鎖的演算法
- 死鎖檢測演算法
- 銀行家演算法
5 Linux 執行緒同步(重點)
5.1 什么是執行緒同步
例如,A 執行緒訪問佇列時,發現佇列為空,它只能等待,直到 B 執行緒將一個節點添加到佇列中,此時需要執行緒同步,需要條件變數,那么執行緒同步的定義:在保證資料安全的前提下(加鎖),讓多個執行流(執行緒)按照某種特定的順序訪問臨界資源,從而有效避免饑餓問題,叫做同步
5.2 為什么需要執行緒同步
需要多執行緒協同高效的完成任務
5.3 如何編碼實作
1、如果條件不滿足,等待,釋放鎖,
2、通知機制
使用一組介面



簡單的使用演示


結果

6 生產者消費者模型
通過本章,我們要知道,什么是生產者消費者模型,為什么會存在這種模型,這種模型該如何設計并編碼,并通過一個基于阻塞佇列的生產者消費者模型,闡述 pthread_cond_wait,為何需要互斥量,和條件變數的規范使用,
6.1 什么是生產者消費者模型
- 該模型有兩個角色:生產者、消費者,維護了3個關系:生產者和生產者之間的互斥掛關系,生產者和消費者之間的同步關系、消費者和消費者之間的互斥關系,實作這樣的模型可以代碼的解耦,一旦解耦代碼的可維護性強,同時適配了生產和消費速度不一致的問題,提高效率(321原則),
6.2 為什么需要生產者消費者模型
- 生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之后不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列里取,阻塞佇列就相當于一個緩沖區,平衡了生產者和消費者的處理能力,這個阻塞佇列就是用來給生產者和消費者解耦的,
6.3 生產者消費者模型的優點
- 解耦
- 支持并發
- 解決忙閑不均

6.4 基于BlockingQueue的生產者消費者模型的代碼實作
- 介紹:在多執行緒編程中阻塞佇列(Blocking Queue)是一種常用于實作生產者和消費者模型的資料結構,其與普通的佇列區別在于,當佇列為空時,從佇列獲取元素的操作將會被阻塞,直到佇列中被放入了元素;當佇列滿時,往佇列里存放元素的操作也會被阻塞,直到有元素被從佇列中取出(以上的操作都是基于不同的執行緒來說的,

用 C++ queue 模擬阻塞佇列的生產消費模型
- 版本一:生產 int 資料 消費 int 資料,單消費者、單生產者
Makefile

BlockQueue.hpp



main.cc


結果

- 版本2 :多消費者,多生產者,生產和消費任務 Task
BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include<queue>
//封裝的任務
class Task {
public:
int _x;
int _y;
public:
Task(){}
Task(int x, int y):_x(x),_y(y)
{
}
int Run()
{
return _x + _y;
}
};
template<class T>
class BlockQueue {
private:
std::queue<T> _q;
size_t _cap;
pthread_mutex_t lock;
pthread_cond_t p_cond;
pthread_cond_t c_cond;
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductorWait()
{
std::cout << "productor wait ..." << std::endl;
pthread_cond_wait(&p_cond, &lock);
}
void ConsumerWait()
{
std::cout << "consumer wait ..." << std::endl;
pthread_cond_wait(&c_cond, &lock);
}
void WakeupProductor()
{
std::cout << "wake up productor ..." << std::endl;
pthread_cond_signal(&p_cond);
}
void WakeupConsumer()
{
std::cout <<"wake up consumer ..." << std::endl;
pthread_cond_signal(&c_cond);
}
bool IsFull()
{
return _q.size() >= _cap;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(size_t cap = 5)
:_cap(cap)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&p_cond, nullptr);
pthread_cond_init(&c_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&p_cond);
pthread_cond_destroy(&c_cond);
}
void Put(const T& t)
{
// 生產者
LockQueue();
while (IsFull())
{
WakeupConsumer();
ProductorWait();
}
_q.push(t);
UnlockQueue();
}
void Take(T& t)
{
// 消費者
LockQueue();
while (IsEmpty())
{
WakeupProductor();
ConsumerWait();
}
t = _q.front();
_q.pop();
UnlockQueue();
}
};
main.cc
#include "BlockQueue.hpp"
#include <unistd.h>
using namespace std;
pthread_mutex_t c_lock;
pthread_mutex_t p_lock;
void* consumer_run(void* arg)
{
BlockQueue<Task>* pbq = (BlockQueue<Task>*)arg;
while (true)
{
//int t = 0;
pthread_mutex_lock(&c_lock);
Task t;
pbq->Take(t);
sleep(1);
//cout << "consume data :" << t << endl;
cout<<"編號 "<< pthread_self()<<" 消費者" <<" consume task is " << t._x << " + " << t._y << " = " << t.Run() << endl;
pthread_mutex_unlock(&c_lock);
}
}
void* productor_run(void* arg)
{
sleep(1);
BlockQueue<Task>* pbq = (BlockQueue<Task>*)arg;
while (true)
{
pthread_mutex_lock(&p_lock);
int x = rand()%10 + 1;
int y = rand()%100 +1;
Task t(x,y);
pbq->Put(t);
// cout << "product data :" << t << endl;
cout <<"編號 " <<pthread_self()<< " 生產者" <<" product Task is : " << x << " + " << y << " = ?" << endl;
pthread_mutex_unlock(&p_lock);
sleep(1);
}
}
int main()
{
BlockQueue<Task> bq;
// 多消費者、多生產者
pthread_t c1,c2,c3,p1,p2,p3;
pthread_mutex_init(&c_lock, nullptr);
pthread_mutex_init(&p_lock,nullptr);
pthread_create(&c1, nullptr, consumer_run, (void*)&bq);
pthread_create(&c2, nullptr, consumer_run, (void*)&bq);
pthread_create(&c3, nullptr, consumer_run, (void*)&bq);
pthread_create(&p1, nullptr, productor_run, (void*)&bq);
pthread_create(&p2, nullptr, productor_run, (void*)&bq);
pthread_create(&p3, nullptr, productor_run, (void*)&bq);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
pthread_join(p3,nullptr);
pthread_mutex_destroy(&c_lock);
pthread_mutex_destroy(&p_lock);
return 0;
}
那么之前提到的問題:為什么 pthread_cond_wait需要互斥量(鎖)===>在等待條件變數被其他執行緒通過訪問臨界資源打破等待的條件時,那么其他執行緒必須要有鎖才可以,所以wait時候,必須釋放鎖,即該函式做了如下作業:自動釋放lock ,當函式被回傳的時候,回傳到了臨界區,則會讓該執行緒重新持有鎖,
- 基于阻塞佇列生產者消費者模型的應用場景:比如注冊B站、抖音、都有這個模型的應用,
6.5 POSIX 信號量的引入
6.5.1 什么是信號量?
- 信號量也稱信號燈,本質上是一個描述臨界資源有效個數的計數器
6.5.2 為什么使用信號量:使用信號量有什么好處?
- POSIX 信號量 和 SystemV 信號量的作用相同,都是用于同步操作,達到無沖突訪問共享資源的目的,但POSIX 可以用于執行緒間同步,可以把臨界資源分成多份,多執行緒對每一份的臨界資源進行同步訪問,大大的提高效率,
struct sem {
int count;
mutex lock;
wait_queue *head;
}
// P() 操作的偽代碼如下
P(){
lock();
if (count > 0) count--;
else
wait
unlock();
}
V(){
lock();
if (count == 原值) wait;
else
count++;
unlock();
}
6.5.3 如何使用信號量
1 初始化信號量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//引數:pshared:0表示執行緒間共享,非零表示行程間共享 value:信號量初始值
2 銷毀信號量
int sem_destroy(sem_t *sem);
3 等待信號量
// 功能:等待信號量,會將信號量的值減1
int sem_wait(sem_t *sem); //P()
4 發布信號量
//功能:發布信號量,表示資源使用完畢,可以歸還資源了,將信號量值加1,
int sem_post(sem_t *sem);//V()
6.6 基于環形佇列的生產消費模型
- 環形佇列采用陣列模擬,用模運算來模擬環狀特性
- 環形結構起始狀態和結束狀態都是一樣的,不好判斷為慷訓者為滿,所以可以通過加計數器或者標記位來判斷滿或者空,另外也可以預一個空的位置,作為滿的狀態
- 但是我們現在有信號量這個計數器,環形佇列為慷訓者為滿都可以由兩個信號量來判斷,具體的我會在代碼中注釋,就很簡單的進行多執行緒間的同步程序
代碼如下
Makefile
main:main.cc
g++ $^ -o $@ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f main
RingQueue.hpp
#pragma once
#include <iostream>
#include <semaphore.h>
#include <vector>
template <class T>
class RingQueue
{
private:
std::vector<T> _v;
int _cap;
// 兩個信號量
sem_t c_sem_data;
sem_t p_sem_blank;
// 兩個下標索引
int c_index;
int p_index;
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(int cap)
:_cap(cap),_v(cap)
{
sem_init(&c_sem_data, 0, 0);
sem_init(&p_sem_blank, 0, cap);
c_index = p_index = 0;
}
~RingQueue()
{
sem_destroy(&c_sem_data);
sem_destroy(&p_sem_blank);
c_index = p_index = 0;
}
void Put(T &in)
{
P(p_sem_blank);
_v[p_index] = in;
p_index++;
p_index %= _cap;
V(c_sem_data);
}
void Get(T &out)
{
// out 為輸出型引數,由呼叫者傳入參考獲取內容
P(c_sem_data);
out = _v[c_index];
c_index++;
c_index %= _cap;
V(p_sem_blank);
}
};
main.cc
#include "RingQueue.hpp"
#include <unistd.h>
using namespace std;
void *consumer(void *arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
while (true)
{
sleep(1);
int t;
rq->Get(t);
cout << "consumer done ..." << t << endl;
}
}
void *productor(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
int count = 100;
while (true)
{
rq->Put(count);
count++;
if (count > 110)
{
count = 100;
}
cout << "productor done" << endl;
}
}
int main()
{
pthread_t c,p;
RingQueue<int> rq(5);
pthread_create(&c, nullptr, consumer, &rq);
pthread_create(&p, nullptr, productor,&rq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
7 執行緒池
7.1 什么是執行緒池
- 一種執行緒使用模式,執行緒過多會帶來調度開銷,進而影響快取區域性和整體性能,而執行緒池維護著多個執行緒,等待著監督管理者分配可并發執行的任務,這避免了在處理短時間任務時創建與銷毀執行緒的代價,執行緒池不僅能夠保證內核的充分利用,還能防止過分調度,可用執行緒數量應該取決于可用的并發處理器、處理器內核、記憶體、網路sockets等的數量,
7.2 執行緒池的應用場景
- 需要大量的執行緒來完成任務,且完成任務的時間比較短, WEB服務器完成網頁請求這樣的任務,使用執行緒池技術是非常合適的,因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點擊次數, 但對于長時間的任務,比如一個Telnet連接請求,執行緒池的優點就不明顯了,因為Telnet會話時間比執行緒的創建時間大多了,
- 對性能要求苛刻的應用,比如要求服務器迅速回應客戶請求,
- 接受突發性的大量請求,但不至于使服務器因此產生大量執行緒的應用,突發性大量客戶請求,在沒有執行緒池情況下,將產生大量執行緒,雖然理論上大部分作業系統執行緒數目最大值不是問題,短時間內產生大量執行緒可能使記憶體到達極限,出現錯誤.
7.3 執行緒池的好處
- 有任務,立馬有執行緒進行服務,省掉了執行緒創建的時間
- 有效防止服務器中執行緒過多,導致系統過載的問題
7.4 模擬實作執行緒池
- 創建固定數量的執行緒池,回圈從任務佇列中獲取任務物件
- 獲取任務物件后,執行任務物件中的任務介面
我們讓主執行緒充當從網路中接受客戶端請求的角色,每個請求是一個任務被送進任務佇列,等待執行緒池里的執行緒來處理任務,我們的任務暫時簡單的描述為:求一個數字的平方,等待后面更新計算機網路的時候可以完成一個小型的專案,
Makefile
testThreadPool:main.cc
g++ $^ -o $@ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f testThreadPool
ThreadPool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <math.h>
#define NUM 5
class Task{
public:
int base;
public:
Task(){}
Task(int _b):base(_b){}
void Run()
{
std::cout <<"thread is[" << pthread_self() << "] task run ... done: base# "<< base << " pow is# "<< pow(base,2) << std::endl;
}
~Task(){}
};
class ThreadPool{
private:
std::queue<Task*> q;
int max_num;
pthread_mutex_t lock;
pthread_cond_t cond; //only consumer, thread pool thread;
bool quit;
public:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
bool IsEmpty()
{
return q.size() == 0;
}
void ThreadWait()
{
pthread_cond_wait(&cond, &lock);
}
void ThreadWakeup()
{
//if(low_water > 30){
// pthread_cond_broadcast(&cond);
//}
pthread_cond_signal(&cond);
}
void ThreadsWakeup()
{
pthread_cond_broadcast(&cond);
}
public:
// 建構式里盡量不要做有風險的事情
ThreadPool(int _max=NUM):max_num(_max),quit(false)
{}
static void* Routine(void *arg) //
{
//執行緒分離
pthread_detach(pthread_self());
ThreadPool *this_p = (ThreadPool*)arg;
while(!quit){
this_p->LockQueue();
while(!quit && this_p->IsEmpty()){
this_p->ThreadWait();
}
Task t;
if(!quit && !this_p->IsEmpty){
this_p->Get(t);
}
this_p->UnlockQueue();
//t.Run();
}
}
void ThreadPoolInit()
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t t;
for(int i = 0; i < max_num; i++){
pthread_create(&t, nullptr, Routine, this);
}
}
//server
void Put(Task &in)
{
LockQueue();
q.push(&in);
UnlockQueue();
ThreadWakeup();
}
//Thread pool t;
void Get(Task &out)
{
Task*t = q.front();
q.pop();
out = *t;
}
void ThreadQuit()
{
if(!IsEmpty()){
std::cout << "task queue is not empty" << std::endl;
return;
}
quit = true;
ThreadsWakeup();
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
};
main.cc
#include "ThreadPool.hpp"
int main()
{
ThreadPool *tp = new ThreadPool();
tp->ThreadPoolInit();
//server
int count = 20;
while(count){
int x = rand()%10+1;
Task t(x);
tp->Put(t);
sleep(1);
count--;
}
tp->ThreadQuit(); //
return 0;
}
7.5 執行緒池VS行程池
- 執行緒池占用的資源更少,但是健壯性(魯棒性)不強,
- 行程池占用的資源更多,但是健壯性(魯棒性)很強,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/280317.html
標籤:其他
上一篇:OAuth2--密碼模式 實戰
