文章目錄
- 生產者消費者模型
- 生產者消費者模型的概念
- 生產者消費者模型的特點
- 生產者消費者模型優點
- 基于BlockingQueue的生產者消費者模型
- 基于阻塞佇列的生產者消費者模型
- 模擬實作基于阻塞佇列的生產消費模型
生產者消費者模型
生產者消費者模型的概念
生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題,
生產者和消費者彼此之間不直接通訊,而通過這個容器來通訊,所以生產者生產完資料之后不用等待消費者處理,直接將生產的資料放到這個容器當中,消費者也不用找生產者要資料,而是直接從這個容器里取資料,這個容器就相當于一個緩沖區,平衡了生產者和消費者的處理能力,這個容器實際上就是用來給生產者和消費者解耦的,

生產者消費者模型的特點
生產者消費者模型是多執行緒同步與互斥的一個經典場景,其特點如下:
- 三種關系: 生產者和生產者(互斥關系)、消費者和消費者(互斥關系)、生產者和消費者(互斥關系、同步關系),
- 兩種角色: 生產者和消費者,(通常由行程或執行緒承擔)
- 一個交易場所: 通常指的是記憶體中的一段緩沖區,(可以自己通過某種方式組織起來)
我們用代碼撰寫生產者消費者模型的時候,本質就是對這三個特點進行維護,
生產者和生產者、消費者和消費者、生產者和消費者,它們之間為什么會存在互斥關系?
介于生產者和消費者之間的容器可能會被多個執行流同時訪問,因此我們需要將該臨界資源用互斥鎖保護起來,
其中,所有的生產者和消費者都會競爭式的申請鎖,因此生產者和生產者、消費者和消費者、生產者和消費者之間都存在互斥關系,
生產者和消費者之間為什么會存在同步關系?
- 如果讓生產者一直生產,那么當生產者生產的資料將容器塞滿后,生產者再生產資料就會生產失敗,
- 反之,讓消費者一直消費,那么當容器當中的資料被消費完后,消費者再進行消費就會消費失敗,
雖然這樣不會造成任何資料不一致的問題,但是這樣會引起另一方的饑餓問題,是非常低效的,我們應該讓生產者和消費者訪問該容器時具有一定的順序性,比如讓生產者先生產,然后再讓消費者進行消費,
注意: 互斥關系保證的是資料的正確性,而同步關系是為了讓多執行緒之間協同起來,
生產者消費者模型優點
- 解耦,
- 支持并發,
- 支持忙閑不均,
如果我們在主函式中呼叫某一函式,那么我們必須等該函式體執行完后才繼續執行主函式的后續代碼,因此函式呼叫本質上是一種緊耦合,
對應到生產者消費者模型中,函式傳參實際上就是生產者生產的程序,而執行函式體實際上就是消費者消費的程序,但生產者只負責生產資料,消費者只負責消費資料,在消費者消費期間生產者可以同時進行生產,因此生產者消費者模型本質是一種松耦合,
基于BlockingQueue的生產者消費者模型
基于阻塞佇列的生產者消費者模型
在多執行緒編程中,阻塞佇列(Blocking Queue)是一種常用于實作生產者和消費者模型的資料結構,

其與普通的佇列的區別在于:
- 當佇列為空時,從佇列獲取元素的操作將會被阻塞,直到佇列中放入了元素,
- 當佇列滿時,往佇列里存放元素的操作會被阻塞,直到有元素從佇列中取出,
知識聯系: 看到以上阻塞佇列的描述,我們很容易想到的就是管道,而阻塞佇列最典型的應用場景實際上就是管道的實作,
模擬實作基于阻塞佇列的生產消費模型
為了方便理解,下面我們以單生產者、單消費者為例進行實作,

其中的BlockQueue就是生產者消費者模型當中的交易場所,我們可以用C++STL庫當中的queue進行實作,
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#define NUM 5
template<class T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() == _cap;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(int cap = NUM)
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
//向阻塞佇列插入資料(生產者呼叫)
void Push(const T& data)
{
pthread_mutex_lock(&_mutex);
while (IsFull()){
//不能進行生產,直到阻塞佇列可以容納新的資料
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_empty); //喚醒在empty條件變數下等待的消費者執行緒
}
//從阻塞佇列獲取資料(消費者呼叫)
void Pop(T& data)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty()){
//不能進行消費,直到阻塞佇列有新的資料
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_full); //喚醒在full條件變數下等待的生產者執行緒
}
private:
std::queue<T> _q; //阻塞佇列
int _cap; //阻塞佇列最大容器資料個數
pthread_mutex_t _mutex;
pthread_cond_t _full;
pthread_cond_t _empty;
};
相關說明:
- 由于我們實作的是單生產者、單消費者的生產者消費者模型,因此我們不需要維護生產者和生產者之間的關系,也不需要維護消費者和消費者之間的關系,我們只需要維護生產者和消費者之間的同步與互斥關系即可,
- 將BlockingQueue當中存盤的資料模板化,方便以后需要時進行復用,
- 這里設定BlockingQueue存盤資料的上限為5,當阻塞佇列中存盤了五組資料時生產者就不能進行生產了,此時生產者就應該被阻塞,
- 阻塞佇列是會被生產者和消費者同時訪問的臨界資源,因此我們需要用一把互斥鎖將其保護起來,
- 生產者執行緒要向阻塞佇列當中Push資料,前提是阻塞佇列里面有空間,若阻塞佇列已經滿了,那么此時該生產者執行緒就需要進行等待,直到阻塞佇列中有空間時再將其喚醒,
- 消費者執行緒要從阻塞佇列當中Pop資料,前提是阻塞佇列里面有資料,若阻塞佇列為空,那么此時該消費者執行緒就需要進行等待,直到阻塞佇列中有新的資料時再將其喚醒,
- 因此在這里我們需要用到兩個條件變數,一個條件變數用來描述佇列為空,另一個條件變數用來描述佇列已滿,當阻塞佇列滿了的時候,要進行生產的生產者執行緒就應該在full條件變數下進行等待;當阻塞佇列為空的時候,要進行消費的消費者執行緒就應該在empty條件變數下進行等待,
- 不論是生產者執行緒還是消費者執行緒,它們都是先申請到鎖進入臨界區后再判斷是否滿足生產或消費條件的,如果對應條件不滿足,那么對應執行緒就會被掛起,但此時該執行緒是拿著鎖的,為了避免死鎖問題,在呼叫
pthread_cond_wait函式時就需要傳入當前執行緒手中的互斥鎖,此時當該執行緒被掛起時就會自動釋放手中的互斥鎖,而當該執行緒被喚醒時又會自動獲取到該互斥鎖, - 當生產者生產完一個資料后,意味著阻塞佇列當中至少有一個資料,而此時可能有消費者執行緒正在empty條件變數下進行等待,因此當生產者生產完資料后需要喚醒在empty條件變數下等待的消費者執行緒,
- 同樣的,當消費者消費完一個資料后,意味著阻塞佇列當中至少有一個空間,而此時可能有生產者執行緒正在full條件變數下進行等待,因此當消費者消費完資料后需要喚醒在full條件變數下等待的生產者執行緒,
判斷是否滿足生產消費條件時不能用if,而應該用while:
pthread_cond_wait函式是讓當前執行流進行等待的函式,是函式就意味著有可能呼叫失敗,呼叫失敗后該執行流就會繼續往后執行,- 其次,在多消費者的情況下,當生產者生產了一個資料后如果使用
pthread_cond_broadcast函式喚醒消費者,就會一次性喚醒多個消費者,但待消費的資料只有一個,此時其他消費者就被偽喚醒了, - 為了避免出現上述情況,我們就要讓執行緒被喚醒后再次進行判斷,確認是否真的滿足生產消費條件,因此這里必須要用while進行判斷,
在主函式中我們就只需要創建一個生產者執行緒和一個消費者執行緒,讓生產者執行緒不斷生產資料,讓消費者執行緒不斷消費資料,
#include "BlockQueue.hpp"
void* Producer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//生產者不斷進行生產
while (true){
sleep(1);
int data = rand() % 100 + 1;
bq->Push(data); //生產資料
std::cout << "Producer: " << data << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//消費者不斷進行消費
while (true){
sleep(1);
int data = 0;
bq->Pop(data); //消費資料
std::cout << "Consumer: " << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t producer, consumer;
BlockQueue<int>* bq = new BlockQueue<int>;
//創建生產者執行緒和消費者執行緒
pthread_create(&producer, nullptr, Producer, bq);
pthread_create(&consumer, nullptr, Consumer, bq);
//join生產者執行緒和消費者執行緒
pthread_join(producer, nullptr);
pthread_join(consumer, nullptr);
delete bq
return 0;
}
相關說明:
- 阻塞佇列要讓生產者執行緒向佇列中Push資料,讓消費者執行緒從佇列中Pop資料,因此這個阻塞佇列必須要讓這兩個執行緒同時看到,所以我們在創建生產者執行緒和消費者執行緒時,需要將該阻塞佇列作為執行緒執行例程的引數進行傳入,
- 代碼中生產者生產資料就是將獲取到的亂數Push到阻塞佇列,而消費者消費資料就是從阻塞佇列Pop資料,為了便于觀察,我們可以將生產者生產的資料和消費者消費的資料進行列印輸出,
生產者消費者步調一致
由于代碼中生產者是每隔一秒生產一個資料,而消費者是每隔一秒消費一個資料,因此運行代碼后我們可以看到生產者和消費者的執行步調是一致的,

小貼士: 以.hpp為后綴的檔案也是頭檔案,該頭檔案同時包含類的定義與實作,呼叫者只需include該hpp檔案即可,因為開源專案一般不需要進行保護,所以在開源專案中用的比較多,
生產者生產的快,消費者消費的慢
我們可以讓生產者不停的進行生產,而消費者每隔一秒進行消費,
void* Producer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//生產者不斷進行生產
while (true){
int data = rand() % 100 + 1;
bq->Push(data); //生產資料
std::cout << "Producer: " << data << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//消費者不斷進行消費
while (true){
sleep(1);
int data = 0;
bq->Pop(data); //消費資料
std::cout << "Consumer: " << data << std::endl;
}
}
此時由于生產者生產的很快,運行代碼后一瞬間生產者就將阻塞佇列打滿了,此時生產者想要再進行生產就只能在full條件變數下進行等待,直到消費者消費完一個資料后,生產者才會被喚醒進而繼續進行生產,生產者生產完一個資料后又會進行等待,因此后續生產者和消費者的步調又變成一致的了,

生產者生產的慢,消費者消費的快
當然,我們也可以讓生產者每隔一秒進行生產,而消費者不停的進行消費,
void* Producer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//生產者不斷進行生產
while (true){
sleep(1);
int data = rand() % 100 + 1;
bq->Push(data); //生產資料
std::cout << "Producer: " << data << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//消費者不斷進行消費
while (true){
int data = 0;
bq->Pop(data); //消費資料
std::cout << "Consumer: " << data << std::endl;
}
}
雖然消費者消費的很快,但一開始阻塞佇列中是沒有資料的,因此消費者只能在empty條件變數下進行等待,直到生產者生產完一個資料后,消費者才會被喚醒進而進行消費,消費者消費完這一個資料后又會進行等待,因此生產者和消費者的步調就是一致的,

滿足某一條件時再喚醒對應的生產者或消費者
我們也可以當阻塞佇列當中存盤的資料大于佇列容量的一半時,再喚醒消費者執行緒進行消費;當阻塞佇列當中存盤的資料小于佇列容器的一半時,再喚醒生產者執行緒進行生產,
//向阻塞佇列插入資料(生產者呼叫)
void Push(const T& data)
{
pthread_mutex_lock(&_mutex);
while (IsFull()){
//不能進行生產,直到阻塞佇列可以容納新的資料
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
if (_q.size() >= _cap / 2){
pthread_cond_signal(&_empty); //喚醒在empty條件變數下等待的消費者執行緒
}
pthread_mutex_unlock(&_mutex);
}
//從阻塞佇列獲取資料(消費者呼叫)
void Pop(T& data)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty()){
//不能進行消費,直到阻塞佇列有新的資料
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
if (_q.size() <= _cap / 2){
pthread_cond_signal(&_full); //喚醒在full條件變數下等待的生產者執行緒
}
pthread_mutex_unlock(&_mutex);
}
我們仍然讓生產者生產的快,消費者消費的慢,運行代碼后生產者還是一瞬間將阻塞佇列打滿后進行等待,但此時不是消費者消費一個資料就喚醒生產者執行緒,而是當阻塞佇列當中的資料小于佇列容器的一半時,才會喚醒生產者執行緒進行生產,

基于計算任務的生產者消費者模型
當然,實際使用生產者消費者模型時可不是簡單的讓生產者生產一個數字讓消費者進行列印而已,我們這樣做只是為了測驗代碼的正確性,
由于我們將BlockingQueue當中存盤的資料進行了模板化,此時就可以讓BlockingQueue當中存盤其他型別的資料,
例如,我們想要實作一個基于計算任務的生產者消費者模型,此時我們只需要定義一個Task類,這個類當中需要包含一個Run成員函式,該函式代表著我們想讓消費者如何處理拿到的資料,
#pragma once
#include <iostream>
class Task
{
public:
Task(int x = 0, int y = 0, int op = 0)
: _x(x), _y(y), _op(op)
{}
~Task()
{}
void Run()
{
int result = 0;
switch (_op)
{
case '+':
result = _x + _y;
break;
case '-':
result = _x - _y;
break;
case '*':
result = _x * _y;
break;
case '/':
if (_y == 0){
std::cout << "Warning: div zero!" << std::endl;
result = -1;
}
else{
result = _x / _y;
}
break;
case '%':
if (_y == 0){
std::cout << "Warning: mod zero!" << std::endl;
result = -1;
}
else{
result = _x % _y;
}
break;
default:
std::cout << "error operation!" << std::endl;
break;
}
std::cout << _x << _op << _y << "=" << result << std::endl;
}
private:
int _x;
int _y;
char _op;
};
此時生產者放入阻塞佇列的資料就是一個Task物件,而消費者從阻塞佇列拿到Task物件后,就可以用該物件呼叫Run成員函式進行資料處理,
void* Producer(void* arg)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
const char* arr = "+-*/%";
//生產者不斷進行生產
while (true){
int x = rand() % 100;
int y = rand() % 100;
char op = arr[rand() % 5];
Task t(x, y, op);
bq->Push(t); //生產資料
std::cout << "producer task done" << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
//消費者不斷進行消費
while (true){
sleep(1);
Task t;
bq->Pop(t); //消費資料
t.Run(); //處理資料
}
}
運行代碼,當阻塞佇列被生產者打滿后消費者被喚醒,此時消費者在消費資料時執行的就是計算任務,當阻塞佇列當中的資料被消費到低于一定閾值后又會喚醒生產者進行生產,

也就是說,此后我們想讓生產者消費者模型處理某一種任務時,就只需要提供對應的Task類,然后讓該Task類提供一個對應的Run成員函式告訴我們應該如何處理這個任務即可,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/433399.html
標籤:其他
上一篇:R語言為dataframe添加新的資料列(橫向拼接、Appending columns,Unioning columns):使用R原生方法、data.table、dplyr等方案
