生產者與消費者模型
- C++模擬多執行緒生產者與消費者模型
- 方法一:互斥量與條件變數模擬實作
- 方法二:信號量模擬實作
C++模擬多執行緒生產者與消費者模型
方法一:互斥量與條件變數模擬實作
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <iostream>
#include <queue>
using namespace std;
// 生產著與消費者模型
// 1.執行緒安全的佇列( queue )
// 互斥:pthread_mutex_t
// 同步:pthread_cond_t
// 2.兩種執行緒:生產者執行緒 + 消費者執行緒
// 生產者執行緒->入口函式
// 消費者執行緒->入口函式
#define THREAD_COUNT 4
template<class T>
class BlockQueue
{
public:
BlockQueue()
{
capacity = 1; //初始化默認佇列大小
lock = PTHREAD_MUTEX_INITIALIZER; //初始化互斥鎖
//初始化 條件變數
producer_cond = PTHREAD_COND_INITIALIZER;
consumer_cond = PTHREAD_COND_INITIALIZER;
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&consumer_cond);
pthread_cond_destroy(&producer_cond);
}
//生產者執行緒:插入資料
void safe_push(T data)
{
pthread_mutex_lock(&lock);
while(safe_queue.size() >= capacity)
{
pthread_cond_wait(&producer_cond,&lock);
}
safe_queue.push(data);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&consumer_cond);
}
//消費者執行緒:取出資料
T safe_pop()
{
pthread_mutex_lock(&lock);
while(safe_queue.empty())
{
pthread_cond_wait(&consumer_cond,&lock);
}
T data = safe_queue.front();
safe_queue.pop();
pthread_mutex_unlock(&lock);
pthread_cond_signal(&producer_cond);
return data;
}
private:
queue<T> safe_queue;
size_t capacity; //設定佇列中最大存盤量
pthread_mutex_t lock; //保證佇列互斥的互斥鎖
//保證生產者執行緒與消費者執行緒同步的條件變數
pthread_cond_t consumer_cond;
pthread_cond_t producer_cond;
};
// 消費者執行緒:從 執行緒安全的佇列 中一直 取出資料
void* consumer_task(void* arg)
{
BlockQueue<int> *safe_queue = (BlockQueue<int>*)arg;
while(1)
{
int data = safe_queue->safe_pop();
printf("consumer[%p]: %d\n",pthread_self(),data);
}
return NULL;
}
// 生產者執行緒:從 執行緒安全的佇列 中一直 插入資料
void* producer_task(void* arg)
{
BlockQueue<int> *safe_queue = (BlockQueue<int>*)arg;
int data = 0;
while(1)
{
safe_queue->safe_push(data);
printf("producer[%p] : %d\n",pthread_self(),data);
data++;
}
return NULL;
}
int main()
{
BlockQueue<int> *safe_queue = new BlockQueue<int>();
pthread_t consumer_thread[THREAD_COUNT],producer_thread[THREAD_COUNT];
for(int i = 0; i < THREAD_COUNT; i++)
{
int ret_consumer = pthread_create(&consumer_thread[i],NULL,consumer_task,(void*)safe_queue);
int ret_prodecer = pthread_create(&producer_thread[i],NULL,producer_task,(void*)safe_queue);
if(ret_consumer < 0 || ret_prodecer)
{
perror("pthread_create");
return -1;
}
}
for(int i = 0; i < THREAD_COUNT; i++)
{
pthread_join(consumer_thread[i],NULL);
pthread_join(producer_thread[i],NULL);
}
return 0;
}
方法二:信號量模擬實作
#include <stdio.h>
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
#include <unistd.h>
using namespace std;
/*
執行緒安全的佇列:封裝vector
1.讀寫下標的計算 pos = (pos + 1) % 陣列大小
2.如何實作執行緒安全?
互斥: 一個信號量,sem_t lock; sem_init(&lock,0,1);
同步: 兩個信號量
生產者信號量:sem_t producer_sem; sem_init(&producer_sem, 0, 陣列大小)
消費者信號量:sem_t consumer_sem; sem_init(&consumer_sem, 0, 0)
*/
#define CAPACITY 4
#define THREAD_COUNT 4
template<class T>
class BlockQueue{
public:
BlockQueue():safe_queue(CAPACITY)
{
capacity = CAPACITY;
sem_init(&lock, 0, 1);
sem_init(&producer_sem, 0, capacity);
sem_init(&consumer_sem, 0, 0);
pos_read = pos_write = 0;
}
~BlockQueue()
{
sem_destroy(&lock);
sem_destroy(&consumer_sem);
sem_destroy(&producer_sem);
}
void push(T data)
{
sem_wait(&producer_sem);
sem_wait(&lock);
safe_queue[pos_write] = data;
pos_write = (pos_write + 1) % capacity;
sem_post(&lock);
sem_post(&consumer_sem);
}
T pop()
{
sem_wait(&consumer_sem);
sem_wait(&lock);
T data = safe_queue[pos_read];
pos_read = (pos_read + 1) % capacity;
sem_post(&lock);
sem_post(&producer_sem);
return data;
}
private:
vector<T> safe_queue;
size_t capacity;
sem_t lock, producer_sem, consumer_sem;
size_t pos_write;
size_t pos_read;
};
void* consumer_task(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(1)
{
int data = p->pop();
printf("consumer:[%p],I get :%d\n",pthread_self(),data);
sleep(1);
}
return NULL;
}
void* producer_task(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
int data = 0;
while(1)
{
p->push(data);
printf("producer:[%p],I put :%d\n",pthread_self(),data);
data++;
sleep(1);
}
return NULL;
}
int main()
{
BlockQueue<int> *p_safe_queue = new BlockQueue<int>();
pthread_t consumer_thread[THREAD_COUNT], producer_thread[THREAD_COUNT];
for(int i = 0; i < THREAD_COUNT; i++)
{
int ret_c = pthread_create(&consumer_thread[i],NULL,consumer_task,(void*)p_safe_queue);
int ret_p = pthread_create(&producer_thread[i],NULL,producer_task,(void*)p_safe_queue);
if(ret_c < 0 || ret_p < 0)
{
perror("pthread_create");
return -1;
}
}
for(int i = 0; i < THREAD_COUNT; i++)
{
pthread_join(consumer_thread[i],NULL);
pthread_join(producer_thread[i],NULL);
}
return 0;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/247155.html
標籤:其他
上一篇:NJUPT《信安數基》復習題
下一篇:硬核二進制安全學習:Buffer Overflow(堆疊的緩沖區溢位&&Pwn技巧Return to Text)
