文章目錄
- 1 .常見鎖的概念
- 1.1 死鎖
- 1.2 死鎖四個必要條件
- 1.3 避免死鎖
- 2. Linux執行緒同步
- 2.1 條件變數
- 2.2 同步概念與競態條件
- 2.3 編碼實作方式
- 2.4 相關介面函式
- 2.4.1 初始化條件變數
- 2.4.2 銷毀條件變數
- 2.4.3 等待條件滿足
- 2.4.4 喚醒等待
- 2.5 代碼示例
- 3. 生產者消費者模型
- 3.1 圖示詳解
- 3.2 代碼示例
- 4. POSIX 信號量
- 4.1 POSIX概念
- 4.2 POSIX函式
- 4.2.1 初始化信號量
- 4.2.2 銷毀信號量
- 4.2.3 等待信號量
- 4.2.4 發布信號量
- 4.3 基于環形佇列的生產消費模型
- 4.3 代碼示例
- 在前面的博客【Linux_深究多執行緒—>link】中已經講了有關多執行緒操作的有關操作和鎖的基本概念,這章將接著上一章的內容對執行緒與鎖的有關概念和操作繼續深究,
1 .常見鎖的概念
1.1 死鎖
- 死鎖是指在
一組行程中的各個行程均占有不會釋放的資源,但因互相申請被其他行程所站用不會釋放的資源而處于的一種永久等待狀態,
1.2 死鎖四個必要條件
互斥條件:一個資源每次只能被一個執行流使用請求與保持條件:一個執行流因請求資源而阻塞時,對已獲得的資源保持不放不剝奪條件:一個執行流已獲得的資源,在末使用完之前,不能強行剝奪回圈等待條件:若干執行流之間形成一種頭尾相接的回圈等待資源的關系
注意:要形成死鎖,四個條件缺一不可,所以解決死鎖的方法就是破壞四個必要條件中的一個以上,常見有銀行家演算法/死鎖檢測演算法等,
1.3 避免死鎖
- 破壞死鎖的四個必要條件
- 加鎖順序一致
- 避免鎖未釋放的場景
- 資源一次性分配
問題:
一個執行緒一把鎖有沒有可能形成死鎖呢?- 答:有,下面代碼演示:

- 原因是同一把鎖被申請兩次,且該鎖還沒有被解鎖,還被占著,自己和自己杠上,即出現死鎖,
2. Linux執行緒同步
2.1 條件變數
- 當一個執行緒互斥地訪問某個變數時,它可能發現在其它執行緒改變狀態之前,它什么也做不了,
- 同步存在是為了多執行緒協同高效完成某些事情,
- 例如一個執行緒訪問佇列時,發現佇列為空,它只能等待,只到其它執行緒將一個節點添加到佇列中,這種情況就需要用到條件變數,
2.2 同步概念與競態條件
同步:在保證資料安全的前提下,讓執行緒能夠按照某種特定的順序訪問臨界資源,從而有效避免饑餓問題,叫做同步,競態條件:因為時序問題,而導致程式例外,我們稱之為競態條件,在執行緒場景下,這種問題也不難理解,
2.3 編碼實作方式
- 如果條件不滿足,等待,釋放鎖,
- 通知機制,
2.4 相關介面函式
2.4.1 初始化條件變數
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
引數:
- cond:要初始化的條件變數
- attr:NULL
2.4.2 銷毀條件變數
int pthread_cond_destroy(pthread_cond_t *cond)
2.4.3 等待條件滿足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
引數:
- cond:要在這個條件變數上等待
- mutex:互斥量,
2.4.4 喚醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
2.5 代碼示例
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
pthread_mutex_t lock;//創建鎖
pthread_cond_t cond;//創建環境變數
void *run1(void *arg)//負責等待執行緒二發來信號開始執行
{
const char *name=(char*)arg;
while(1)
{
pthread_cond_wait(&cond,&lock);//等待
printf("%s get singal\n",name);
}
}
void *run2(void *arg)//負責給執行緒一發送信號
{
const char *name=(char*)arg;
while(1)
{
sleep(2);
pthread_cond_signal(&cond);//喚醒等待
printf("%s signal done!\n",name);
}
}
int main()
{
pthread_mutex_init(&lock,NULL);//初始化鎖
pthread_cond_init(&cond,NULL);//初始化環境變數
pthread_t t1;
pthread_t t2;
pthread_create(&t1,NULL,run1,"thread1");//創造兩個新執行緒
pthread_create(&t2,NULL,run2,"thread2");
pthread_join(t1,NULL);//等待兩個執行緒退出
pthread_join(t2,NULL);
pthread_mutex_destroy(&lock);//銷毀鎖
pthread_cond_destroy(&cond);//銷毀環境變數
return 0;
}
運行結果:

問題:
為什么pthread_cond_wait()需要互斥量呢?- 答:
- 條件等待是執行緒間同步的一種手段,如果只有一個執行緒,條件不滿足,一直等下去都不會滿足所以必須要有一個執行緒通過某些操作,改變共享變數,使原先不滿足的條件變得滿足,并且友好的通知等待在條件變數上的執行緒,
- 條件不會無緣無故的突然變得滿足了,必然會牽扯到共享資料的變化,所以一定要用互斥鎖來保護,沒有互斥鎖就無法安全的獲取和修改共享資料,
- 簡單來說,pthread_cond_wait()中還要帶上鎖是因為,如果該執行緒在等待時,此時應該將臨界區進行解鎖,讓另外的執行緒在該執行緒等待時,可以進入到臨界區,要不然它在等待時還一直加鎖狀態,就沒意義,當別的行程給該行程發信號,喚醒它,這個時候從新加鎖,訪問臨界區,保證其原子性,
3. 生產者消費者模型
問題:
- 為何要使用生產者消費者模型 ?
- 生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之后不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列里取,阻塞佇列就相當于一個緩沖區,平衡了生產者和消費者的處理能力,這個阻塞佇列就是用來給生產者和消費者解耦的,
生產者消費者模型優點:
- 解耦
- 支持并發
- 支持忙閑不均
3.1 圖示詳解

- 基于BlockingQueue的生產者消費者模型 ,BlockingQueue 在多執行緒編程中阻塞佇列(Blocking Queue)是一種常用于實作生產者和消費者模型的資料結構,
- 其與普通的佇列區別在于,當佇列為空時,從佇列獲取元素的操作將會被阻塞,直到佇列中被放入了元素;當佇列滿時,往佇列里存放元素的操作也會被阻塞,直到有元素被從佇列中取出(以上的操作都是基于不同的執行緒來說的,執行緒在對阻塞佇列行程操作時會被阻塞),
生產者消費者“321”- 3->
3種關系:生產者&消費者(互斥/同步),生產者&生產者(互斥),消費者&消費者(互斥) - 2->
2種角色:生產者,消費者 - 3->
一個交易場所
3.2 代碼示例
ps.h
#ifndef __QUEUE_BLOCK_H__
#define __QUEUE_BLOCK_H__
#include<unistd.h>
#include<iostream>
#include<pthread.h>
#include<queue>
using namespace std;
class Task
{
public:
int x;
int y;
public:
Task(int _x,int _y):x(_x),y(_y)
{}
Task()
{}
int Run()
{
return x+y;
}
~Task()
{}
};
class BlockQueue
{
private:
queue<Task> q;
size_t cap;
pthread_mutex_t lock;
pthread_cond_t c_cond;
pthread_cond_t p_cond;
public:
//判滿
bool IsFull()
{
return q.size()>=cap;
}
//判空
bool IsEmpty()
{
return q.empty();
}
//解鎖
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
//喚醒消費者
void WakeUpConsumer()
{
cout<<"WakeUpConsumer"<<endl;
pthread_cond_signal(&c_cond);
}
//喚醒生產者
void WakeUpProductor()
{
cout<<"WakeUpProductor"<<endl;
pthread_cond_signal(&p_cond);
}
//pthread_cond_wait傳入lock引數:
//在條件滿足時,消費者or生產者持有鎖進入臨界區執行,當判斷條件不滿足時,呼叫對應的wait函式
//在消費者or生產者等待時,呼叫對應的Wait函式,自動釋放lock
//消費者等待,必須解鎖,讓另一個角色持有鎖,以保證執行緒之間友好,在
void ConsumerWait()
{
cout<<"ConsumerWait"<<endl;
pthread_cond_wait(&c_cond,&lock);
}
//生產者等待
void ProductWait()
{
cout<<"ProductWait"<<endl;
pthread_cond_wait(&p_cond,&lock);
}
public:
BlockQueue(int _cap):cap(_cap)
{
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&c_cond,nullptr);
pthread_cond_init(&p_cond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&p_cond);
pthread_cond_destroy(&c_cond);
}
//消費者消費,拿取or執行佇列中的資料以及任務并執行
void Put(Task in)
{
UnlockQueue();
if(IsFull())
{
WakeUpConsumer();
ProductWait();
}
q.push(in);
UnlockQueue();
}
//生產者生產,向佇列中塞資料或者任務
void Get(Task &out)
{
UnlockQueue();
if(IsEmpty())
{
WakeUpProductor();
ConsumerWait();
}
out=q.front();
q.pop();
UnlockQueue();
}
};
#endif
ps.cpp
#include "block.h"
using namespace std;
void *consumer_run(void *arg)
{
//int num=(int)arg;
pthread_mutex_t lock;
BlockQueue *bq=(BlockQueue*)arg;
while(true)
{
pthread_mutex_lock(&lock);
//int data;
Task t;
bq->Get(t);
//t.Run();
pthread_mutex_unlock(&lock);
cout<<"consumer"<<t.x<<"+"<<t.y<<"="<<t.Run()<<endl;
sleep(1);
}
//pthread_mutex_unlock(&lock);
}
void *productor_run(void *arg)
{
//int num=(int)arg;
pthread_mutex_t lock;
sleep(1);
BlockQueue *bq=(BlockQueue*)arg;
//int count=0;
while(true)
{
int x=rand()%10+1;
int y=rand()%100+1;
pthread_mutex_lock(&lock);
Task t(x,y);
bq->Put(t);
pthread_mutex_unlock(&lock);
cout<<"productor "<<x<<"+"<<y<<"=?"<<endl;
}
//pthread_mutex_unlock(&lock);
}
int main()
{
BlockQueue *bq = new BlockQueue(5);
pthread_t con1,pro1;
pthread_create(&con1,nullptr,consumer_run,(void*)bq);
pthread_create(&pro1,nullptr,productor_run,(void*)bq);
//pthread_create(&con2,nullptr,consumer_run,(void*)bq);
//pthread_create(&pro2,nullptr,productor_run,(void*)bq);
pthread_join(con1,nullptr);
//pthread_join(con2,nullptr);
pthread_join(pro1,nullptr);
//pthread_join(pro2,nullptr);
return 0;
}
運行結果:

4. POSIX 信號量
4.1 POSIX概念
- POSIX信號量和SystemV信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源目的, 但POSIX可以用于執行緒間同步,

4.2 POSIX函式
4.2.1 初始化信號量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
引數:
- pshared:0表示執行緒間共享,非零表示行程間共享
- value:信號量初始值
4.2.2 銷毀信號量
int sem_destroy(sem_t *sem)
4.2.3 等待信號量
int sem_wait(sem_t *sem);
功能:
- 等待信號量,會將信號量的值減1 //P()
4.2.4 發布信號量
int sem_post(sem_t *sem)//V()
功能:
- 發布信號量,表示資源使用完畢,可以歸還資源了,將信號量值加1
上面生產者-消費者的例子是基于queue的,其空間可以動態分配,現在基于固定大小的環形佇列重寫這個程式(POSIX信號量):
4.3 基于環形佇列的生產消費模型
- 環形佇列采用陣列模擬,用模運算來模擬環狀特性
- 環形結構起始狀態和結束狀態都是一樣的,不好判斷為慷訓者為滿,所以可以通過加計數器或者標記位來判斷滿或者空,另外也可以預留一個空的位置,作為滿的狀態
- 但是我們現在有信號量這個計數器,就很簡單的進行多執行緒間的同步程序

4.3 代碼示例
// RingQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <vector>
class RingQueue
{
private:
std::vector<int> _q;
int _cap;
//信號量
sem_t sem_blank;
sem_t sem_data;
//資源下標
int pro_sub;
int con_sub;
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int cap=6)
:_cap(cap)
,_q(cap)
{
sem_init(&sem_blank,0,_cap);//格子個數一開始為容量大小
sem_init(&sem_data,0,0);//資料一開始為0
pro_sub=0;
con_sub=0;
}
void Put(const int &data)
{
P(sem_blank);//申請格子資源,判斷是否還有容量,格子--
_q[pro_sub]=data;
pro_sub++;
pro_sub%=_cap;//越界回環
V(sem_data);//資料++
}
void Get(int &data)
{
P(sem_data);//申請資料資源,判斷是否還有資料,資料--
data=_q[con_sub];
con_sub++;
con_sub%=_cap;
V(sem_blank);//消耗資料,格子++
}
~RingQueue()
{
sem_destroy(&sem_blank);
sem_destroy(&sem_data);
pro_sub=con_sub=0;
}
};
//main.cpp
#include "RingQueue.hpp"
pthread_mutex_t pro_lock;//生產者組內競爭鎖
pthread_mutex_t con_lock;//消費者組內競爭鎖
int count=0;//生產者資料
void Lock(pthread_mutex_t &lock)
{
pthread_mutex_lock(&lock);
}
void ULock(pthread_mutex_t &lock)
{
pthread_mutex_unlock(&lock);
}
void *Get(void *arg)
{
RingQueue *q=(RingQueue*)arg;
while(1)
{
usleep(1);
int data=0;
Lock(con_lock);//組內競爭
q->Get(data);//獲取資料
ULock(con_lock);
std::cout<<"consumer get data...:"<<data<<std::endl;
}
}
void *Put(void *arg)
{
RingQueue *q=(RingQueue*)arg;
while(1)
{
sleep(1);//增加系統呼叫
Lock(pro_lock);//組內競爭
int number = (++count)%10l;
q->Put(number);
ULock(pro_lock);
std::cout<<"productor put data...."<< number << std::endl;
}
}
int main()
{
//創建交易場所
RingQueue *q=new RingQueue(5);
//初始化鎖
pthread_mutex_init(&con_lock,nullptr);
pthread_mutex_init(&pro_lock,nullptr);
//創建執行緒
pthread_t tid1,tid2,tid3,tid4,tid5,tid6;
pthread_create(&tid1,nullptr,Put,q);
pthread_create(&tid2,nullptr,Put,q);
pthread_create(&tid3,nullptr,Put,q);
pthread_create(&tid4,nullptr,Get,q);
pthread_create(&tid5,nullptr,Get,q);
pthread_create(&tid6,nullptr,Get,q);
//等待執行緒、避免記憶體泄漏,不關心回傳值
pthread_join(tid1,nullptr);
pthread_join(tid2,nullptr);
pthread_join(tid3,nullptr);
pthread_join(tid4,nullptr);
pthread_join(tid5,nullptr);
pthread_join(tid6,nullptr);
//銷毀作業
pthread_mutex_destroy(&con_lock);
pthread_mutex_destroy(&pro_lock);
delete q;
return 0;
}
運行結果:

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/282144.html
標籤:其他
上一篇:第十二屆藍橋杯C++賽后感
