文章目錄
- 互斥
- 互斥量(鎖)的介面
- 為什么加鎖是原子的?
- 為什么pthread_cond_wait呼叫時要傳入互斥鎖?
- 可重入與執行緒安全
- 死鎖
- 同步
- 條件變數
- 生產者消費者模型
- 基于阻塞佇列的生產消費模型
互斥
所有的執行緒資料是共享的,
被多個執行流訪問的公共資源叫做臨界資源,訪問臨界資源是以執行緒/執行流的方式去訪問的,把每個執行緒內訪問臨界資源的那部分代碼叫做臨界區,
不一定所有的共享資源都會被所有執行緒訪問,比如main只被一個執行流訪問,
互斥:任何時刻,互斥保證有且只有一個執行流進入臨界區,訪問臨界資源,通常對臨界資源起保護作用,
原子性:不會被任何調度機制打斷的操作,該操作只有兩態,要么完成,要么未完成,
多個執行緒并發的操作共享變數,會帶來一些問題:
比如我們假設有ticket = 100,有很多個執行流同時都要去讓ticket--,而--操作并不是原子操作,而是對應三潭訓編指令:
1. load : 將共享變數ticket從記憶體加載到暫存器中
2. update : 更新暫存器里面的值,執行-1操作
3. store : 將新值,從暫存器寫回共享變數ticket的記憶體地址
因此必須要滿足:
- 代碼必須要有互斥行為:當代碼進入臨界區執行時,不允許其他執行緒進入該臨界區,
- 如果多個執行緒同時要求執行臨界區的代碼,并且臨界區沒有執行緒在執行,那么只能允許一個執行緒進入該臨界區,
- 如果執行緒不在臨界區中執行,那么該執行緒不能阻止其他執行緒進入臨界區,
這時就需要一把鎖,Linux上提供的這把鎖叫互斥量,互斥鎖,

互斥量(鎖)的介面
初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
mutex:要初始化的互斥量
attr:NULL
銷毀
int pthread_mutex_destroy(pthread_mutex_t *mutex);
加鎖和解鎖
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
要注意:加鎖粒度應是越小越好,加鎖多了,會榷訓多執行緒的效率,因為加鎖后,它們是串行走的,違背了多執行緒分工執行提高效率的本來目的,
對上述進行一個總結:
1、我們要對臨界區進行保護,前提是所有的執行執行緒都必須遵守這個規則(這個是要通過編碼保證的),否則就無法保護,
2、進入臨界區的程序一定是要先進行加鎖,訪問臨界區,再進行解鎖,
3、因為所有的執行緒都必須先加鎖,所以所有的執行緒都必須看到同一把鎖因此,鎖本身也是個臨界資源,要保護別人,就要先保證自己的安全性,
鎖本身不安全的情況:一把鎖對多個執行緒同時使用
因此我們必須保證這個鎖在任何一個時間只允許被一個執行緒占用,
申請鎖的程序是不能有中間狀態的,也就是兩態的(原子性),解鎖也是原子的,
4、訪問臨界區是要花時間的,因此在特定執行緒(互斥鎖)/ 行程(通信用的是二元信號量)擁有鎖的時候,期間有新執行緒過來申請鎖,它一定是申請不到的,那么新執行緒要進行阻塞(執行緒被阻塞,對應的pcb中的狀態由R改成非R,并把pcb從運行佇列中投入到等待佇列),占有鎖的執行緒解鎖后之后,再把等待佇列的執行緒進行一一喚醒,
5、該如何理解pthread庫中提供的關于鎖的四個介面呢?
鎖有很多,因此也需要被管理,可以理解成它是一個結構體
struct mutex{
int lock; // 0代表占有;1代表當前可被申請
wait_queue *head; // 等待佇列
} // 偽代碼
init:初始化,表示將這把鎖設定為可被申請,并把等待佇列置為空
destroy:銷毀
lock:鎖的內部標識變數由1變0
unlock:變數由0變1
以上是一種簡單理解,
6、一次保證只有一個執行緒進入臨界區訪問臨界資源,這種特征叫做互斥,
有可能出現這種情況:在臨界區中的多行代碼中,這個執行緒的時間片到了(或者優先級更高的執行緒來了要搶占),因此當前執行緒被切換了,但有沒有影響?完全沒有,因為它是帶著鎖走的,在被切換時并沒有解鎖,
7、加鎖時為什么一般效率比較低,或者會影響效率?
因為執行流本來是并行或并發執行的,加了鎖后變成了串行,且它占著鎖時,它也有可能被切換走,這時,別的執行緒并不能訪問臨界資源,只能阻塞等待,
因此鎖的影響有兩點:
- 所有的執行緒在訪問臨界資源時都會串行
- 當占有鎖的執行緒被切換時,會影響到其他所有執行緒的執行情況,
- 鎖的應用——模擬搶票
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
int ticket = 10000;
pthread_mutex_t lock; //鎖
void* get_ticket(void* arg)
{
usleep(1000);
int num = (int)arg;
while(1)
{
pthread_mutex_lock(&lock);
if(ticket > 0){
usleep(1000);
printf("thread %d ,get a ticket, no %d\n",num, ticket);
ticket--;
pthread_mutex_unlock(&lock);
}
else{
pthread_mutex_unlock(&lock);
break;
}
}
}
int main()
{
int i = 0;
pthread_t tid[4];
pthread_mutex_init(&lock, NULL);
for(i = 0; i < 4; i++)
{
pthread_create(tid+i, NULL, get_ticket, (void*)i);
}
for(i = 0; i < 4; i++)
{
pthread_join(tid[i], NULL);
}
pthread_mutex_destroy(&lock);// 釋放掉鎖
return 0;
}
為什么加鎖是原子的?

重點關注exchange指令,它的作用是把暫存器和記憶體單元的資料相交換,由于只有一條指令,保證了原子性,注意,交換并非拷貝式轉移!
mutex中存放的1,與%al中存放的0交換,這個程序是一條命令完成的,因為只有一個1,所以只有拿到1的那個執行緒能夠執行if,也就是只有它能夠申請成功,
1、整個程序中,為1的mutex只有一份,沒有因為拷貝而增多,
2、一條exchange匯編就完成了暫存器資料(al)和記憶體資料(mutex)的交換,
因此,加鎖是原子的,
解鎖也是原子的!能夠解鎖那么曾經一定是加過鎖,所以能運行到解鎖這里只有它這一個執行流,
為什么pthread_cond_wait呼叫時要傳入互斥鎖?
我們先來想,為什么執行緒要等待?因為當條件不滿足時,它只能等,那怎么知道條件滿不滿足呢?要經過判斷,要判斷的話,就必須進入臨界區才能判斷,所以此時一定是持有鎖進入臨界區的,
當我們判斷了條件不滿足時,就要執行wait,
在wait執行時必須將鎖釋放,不然就會導致一個執行緒持有鎖等待,其他執行緒無法進入臨界區,
總結:
在呼叫wait這個函式時,會自動釋放鎖,
在調完該函式時,執行緒醒了,發現自己在臨界區里,所以這時該函式會讓該執行緒重新持有鎖,
以上都是wait函式自動完成的,因此要傳入鎖,
呼叫時方便釋放鎖,回傳時要重新獲得鎖,
可重入與執行緒安全
執行緒安全:多個執行緒并發同一段代碼時,不會出現不同的結果,常見對全域變數或者靜態變數進行操作,并且沒有鎖保護的情況下,會出現該問題,
重入:同一個函式被不同的執行流呼叫,當前一個流程還沒有執行完,就有其他的執行流再次進入,我們稱之為重入,一個函式在重入的情況下,運行結果不會出現任何不同或者任何問題,則該函式被稱為可重入函式,否則,是不可重入函式,
可重函式強調的主體是函式,執行緒安全強調的主體是執行緒,
執行緒有可能會調一個函式導致出錯,這個函式叫做不可重入函式,出現的問題叫做執行緒安全問題,
死鎖
是指在一組行程中的各個行程均占有不會釋放的資源,但因互相申請被其他行程所占用不會釋放的資源而處于的一種永久等待狀態,
死鎖四個必要條件
- 互斥條件:一個資源每次只能被一個執行流使用,
- 請求與保持條件:一個執行流因請求資源而阻塞時,對已獲得的資源保持不放,
- 不剝奪條件:一個執行流已獲得的資源,在末使用完之前,不能強行剝奪,
- 回圈等待條件:若干執行流之間形成一種頭尾相接的回圈等待資源的關系,
產生死鎖一定是因為這幾個原因同時發生導致的,那么如果至少破壞一個,就不會產生了,
避免死鎖
- 破壞死鎖的四個必要條件
- 加鎖順序一致
- 避免鎖未釋放的場景
- 資源一次性分配
同步
- 什么是同步?(這里的同步指的是多執行緒執行時)
在保證資料安全的情況下(一般是使用加鎖方式),讓多個執行流按照特定的順序進行臨界資源的訪問,稱之為同步的程序,
- 為什么要存在同步?
互斥保證我們不出錯,同步保證我們多執行緒協同高效,完成某些事物,
條件變數
是對互相通知時特定條件狀態的抽象,
條件變數函式:
- 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
cond:要初始化的條件變數
attr:NULL
- 銷毀
int pthread_cond_destroy(pthread_cond_t *cond)
- 等待條件滿足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
cond:要在這個條件變數上等待
mutex:互斥量,后面詳細解釋
- 喚醒等待:在該條件變數中等的行程都會被喚醒
int pthread_cond_signal(pthread_cond_t *cond);
- 如何編碼實作?
1、如果條件不滿足時,pthread_cond_wait等待,
2、通知機制pthread_cond_signal,
- 創建兩個執行緒:執行緒2控制執行緒1,執行緒2發送通知,執行緒1活動,
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
pthread_mutex_t lock;
pthread_cond_t cond;
// 想讓執行緒2控制執行緒1
void* routine_r1(void * arg)
{
const char* name = (char*)arg;
while(1)
{
pthread_cond_wait(&cond, &lock);
printf("get cond, %s: 我活了\n", name);
}
}
void* routine_r2(void* arg)
{
const char* name = (char*)arg;
while(1)
{
sleep(rand()%3 + 1);
pthread_cond_signal(&cond);
printf("%s signal done...\n", name);
}
}
int main()
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&cond, NULL);
pthread_t t1, t2;
pthread_create(&t1, NULL, routine_r1, "thread 1");
pthread_create(&t2, NULL, routine_r2, "thread 2");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
return 0;
}
如何理解條件變數?一個執行緒要在cond上等,一個要把通知資訊發到cond,
可以理解為封裝成了一個結構體
struct cond{
int val; // 當前條件是否成立,0成立,1不成立
wait_queue *head; // 等待佇列,都在等cond條件成立
}
生產者消費者模型
當我們作為消費者在超市買東西時,總有相應的廠商作為生產者會不停的補充貨物,
該模型可以理解為上述例子的抽象,
生產者消費者模型本質是:有一段“記憶體空間”,有多個執行緒負責生產,有多個執行緒負責消費,
目的:集中化管理,使效率變高
- 模型包括:
生產者(n個),消費者(n個):通常是行程 / 執行緒
空間、交易場所:一塊“記憶體塊“
產品:資料
遵守"321”原則:
- 3:維護3種關系——生產者和生產者(互斥,任何一個時刻只允許有一個生產者往里面寫)、消費者和消費者(互斥)、生產者和消費者(同步:生產完消費,消費完生產)
- 2:兩種角色,生產者和消費者
- 1:一個交易場所,
之前寫的單行程代碼中,所有的函式呼叫都是串行的,因為在執行到對應的函式時,要等,比如:

在主函式中,它作為生產者時,要給add函式提供a和b;而它作為消費者時,要獲取add執行后的數存放到c,
相應地,在add函式中,它作為生產者,為主函式提供加合,而作為消費者,從主函式獲取a和b,
這樣的代碼,耦合性很強,
因為在執行到主函式的add時,主函式要等待add的執行,以獲得c,效率并不高,且二者在互相的這種供給和需求,維護性很差,
如果改成這樣:

執行緒1只生產資料,執行緒2去獲取資料,然后做對應的操作,
那么,就實作了在代碼層面解耦,維護性很強,
行程間通信的本質也是生產者消費者模型,
我們先來探討一下該模型的價值意義,
如圖,服務器從網路中讀取資料存入資料庫,

服務器從網路中讀資料時,它并不一定隨時都能讀到,因此大部分時間都是在等待資料,且寫入資料庫是要訪問硬碟(涉及IO,耗時),
所以如果不經過任務佇列直接將資料存入資料庫,一個執行緒要做的作業都是串行的,用戶所等的時間過長,
而任務佇列是在記憶體開辟的空間,有了任務佇列,把資料一個執行緒負責把資料先放在佇列里,另一個執行緒負責從佇列中讀資料放到資料庫,這樣耗時短,
基于阻塞佇列的生產消費模型
- BlockQueue.hpp
#ifndef __QUEUE_BLOCK_H__
#define __QUEUE_BLOCK_H__
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
class Task{
public:
Task(){}
Task(int x, int y)
:_x(x), _y(y)
{}
int Run()
{
return _x + _y;
}
~Task(){}
public:
int _x;
int _y;
};
//作為交易場所,要保證它的安全,互斥
class BlockQueue{
private:
std::queue<Task> q;
size_t cap;
pthread_mutex_t lock;
//當一方特別慢時,需要另一方等待,可以了之后,也就要被喚醒,需要條件變數,互相通知,
pthread_cond_t c_cond;//消費者在該條件下等
pthread_cond_t p_cond;//生產者在該條件下等
public:
bool IsFull()
{
return q.size() >= cap;
}
bool IsEmpty()
{
return q.empty();
}
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
void WakeUpProducer()
{
std::cout<< "wake up Producer" << std::endl;
pthread_cond_signal(&p_cond);
}
void WakeUpConsumer()
{
std::cout<< "wake up Consumer" << std::endl;
pthread_cond_signal(&c_cond);
}
void ProducerWait()
{
std::cout<< "Producer wait" << std::endl;
pthread_cond_wait(&p_cond, &lock);
}
void ConsumerWait()
{
std::cout<< "Consumer wait" << std::endl;
pthread_cond_wait(&c_cond, &lock);
}
public:
BlockQueue(int _cap):cap(_cap){
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&c_cond,nullptr);
pthread_cond_init(&p_cond,nullptr);
}
void PutTask(Task t){
LockQueue();
while(IsFull())
{
WakeUpConsumer();
ProducerWait();
}
q.push(t);
UnlockQueue();
}
void getTask(Task& t){
LockQueue();
while(IsEmpty())
{
WakeUpProducer();
ConsumerWait();
}
t = q.front();
q.pop();
UnlockQueue();
}
~BlockQueue()
{
pthread_mutex_lock(&lock);
pthread_cond_destroy(&c_cond);
pthread_cond_destroy(&p_cond);
}
};
#endif
- main.cc
#include"BlockQueue.hpp"
using namespace std;
void* producer_running(void* arg)
{
sleep(1);
BlockQueue *bq = (BlockQueue*)arg;
while(true)
{
//lock
int x = rand()%10 + 1;
int y = rand()%100 + 1;
Task t(x, y);
bq->PutTask(t);
//unlock
cout<< "producer Task is: " << x << "+" << y << "=?" << endl;
sleep(1);
}
}
void* consumer_running(void* arg)
{
BlockQueue *bq = (BlockQueue*)arg;
while(true)
{
//lock 如果是多消費者的情況,要實作內部互斥就要上鎖
Task t;
bq->getTask(t);
cout<<"consumer: "<< t._x << "+" << t._y << "=" << t.Run() <<endl;
//unlock
}
}
int main()
{
BlockQueue* bq = new BlockQueue(5);
pthread_t c, p;
pthread_create(&c, nullptr, consumer_running, (void*)bq);
pthread_create(&p, nullptr, producer_running, (void*)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
- Makefile
main:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f main
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/281207.html
標籤:其他
