我们今天来学习生产者消费者模型:
阻塞队列
阻塞队列(Blocking Queue)是一种特殊的数据结构,常用于多线程编程中实现线程间的同步。它结合了队列(FIFO,先进先出)的特点与线程同步机制,提供了一种高效且线程安全的通信方式。阻塞队列的主要特点是:
- 自动同步:当队列为空时,试图从队列中取出元素的操作(如
take
或poll
)会阻塞,直到队列中有新的元素被放入。同样地,当队列已满时,试图向队列中添加元素的操作(如put
或offer
)也会阻塞,直到队列中有空间可用。- 线程安全:阻塞队列内部实现了必要的同步机制,因此多个生产者线程和消费者线程可以安全地并发访问队列,无需外部加锁。
- 灵活的阻塞行为:许多阻塞队列实现提供了超时选项,允许插入或提取操作在指定时间内阻塞,超时后返回特定值(如
null
或抛出异常)。- 条件等待与通知:在内部,阻塞队列通常利用条件变量(如Java中的
Condition
,或C++中的std::condition_variable
)来管理线程的阻塞与唤醒,实现了高效的线程间协作。
应用场景
- 生产者-消费者模型:最常见的应用场景,生产者线程向队列中添加任务,消费者线程从队列中取出并处理任务,阻塞队列天然支持这一模式,简化了同步代码的编写。
- 任务调度:在多线程任务调度中,可以使用阻塞队列来暂存待执行的任务,任务调度器根据队列状态动态分配任务给空闲的工作线程。
- 限流与缓冲:在需要控制数据处理速率或作为缓冲区减少系统压力的场景下,阻塞队列可以作为一种简单有效的解决方案。
我们的阻塞队列是我们进行交易的场所,我们可以先把架子先搭起来:
我们创建一个阻塞队列的类:
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<cstring>
//阻塞队列
class BlockQueue
{
public:
private:
};
理解
我们这里用图来通俗解释一下生产者消费者模型:
我们这里举个例子爸爸往盘子里面放苹果,孩子拿苹果吃掉。
在这个场景里,我们很容易分清哪个是生产者,哪个是消费者:
同时,这里面会有三种情况:
- 爸爸放苹果,孩子没有动作
- 孩子从盘子拿走苹果,爸爸没有动作
- 爸爸向盘子放苹果时,孩子也来拿苹果。
第三种情况是我们不想发生的,因为经过这样的过程,数据可能不准确,所以,为了防止第三种,我们对盘子上锁,保证在一个时间段内,只有爸爸或者孩子操作:
同时,还有两种情况:
- 爸爸一直霸占锁,导致一直生产,达到容量上限,但孩子吃不到,因为没有机会。
- 孩子一直霸占锁,没有产品还一直霸占锁,导致爸爸不能把苹果放在盘子里。
为了防止上面两种情况,我们要用到条件变量,当产品数量达到容量上限,爸爸休息,孩子拿走苹果,等到容量有空闲,孩子告诉爸爸可生产了,当盘子里没有产品,孩子休息,让爸爸生产产品之后,有了一定数量的产品,再唤醒孩子来拿苹果
单消费单生产者
我们先从简单的来,先从单消费单生产者模式来谈起:
我们创建一个生产者和一个消费者,一个用来制造数字,另一个用来获取数字:
在这个示例中,我们使用了 POSIX 线程库来实现一个阻塞队列(BlockQueue
)。阻塞队列是一种线程安全的数据结构,它可以在队列已满时阻塞生产者线程,在队列为空时阻塞消费者线程。
以下是带有注释的代码:
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<cstring>
#include<queue>
#include<sys/types.h>
// 阻塞队列模板类
template<class T>
class BlockQueue {
public:
// 构造函数,初始化队列容量、锁和条件变量
BlockQueue(int capacity) : _capacity(capacity) {
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
// 判断队列是否已满
bool IsFull() {
return _capacity == _queue.size();
}
// 判断队列是否为空
bool IsEmpty() {
return _queue.empty();
}
// 生产者:向队列中添加元素
void Push(const T& data) {
// 上锁保证数据安全
pthread_mutex_lock(&_mutex);
// 如果队列已满,生产者线程休眠
if (IsFull()) {
std::cout << "producter fall sleep"<< std::endl;
pthread_cond_wait(&_p_cond, &_mutex);
} else {
_queue.push(data);
// 通知消费者线程
pthread_cond_signal(&_c_cond);
}
// 解锁
pthread_mutex_unlock(&_mutex);
}
// 消费者:从队列中取出元素
void Pop(T* out) {
// 上锁保证数据安全
pthread_mutex_lock(&_mutex);
// 如果队列为空,消费者线程休眠
if (IsEmpty()) {
std::cout << "consumer fall sleep"<< std::endl;
pthread_cond_wait(&_c_cond, &_mutex);
} else {
*out = _queue.front();
_queue.pop();
// 通知生产者线程
pthread_cond_signal(&_p_cond);
}
// 解锁
pthread_mutex_unlock(&_mutex);
}
// 析构函数
~BlockQueue() {
// 销毁锁和条件变量
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
private:
// 队列
std::queue<T> _queue;
// 容量
int _capacity;
// 锁
pthread_mutex_t _mutex;
// 条件变量
pthread_cond_t _c_cond; // 消费者
pthread_cond_t _p_cond; // 生产者
};
这里注意我们这里是消费者快,生产者慢。
在这个示例中,我们使用了 POSIX 线程库来实现一个阻塞队列。阻塞队列使用互斥锁(pthread_mutex_t
)来保证线程安全,并使用条件变量(pthread_cond_t
)来实现生产者和消费者之间的同步。当队列已满时,生产者线程会等待条件满足;当队列为空时,消费者线程会等待条件满足。在这个示例中,我们使用了两个条件变量,一个用于生产者,另一个用于消费者。
使用阻塞队列模拟分发任务
我们阻塞队列最大的用处就是实现分发任务时,生产者和消费者的同步,我们可以模拟实现一个计算的任务然后利用阻塞队列来模拟分发任务时的场景:
我们创建一个任务类:
# pragma once
#include<iostream>
#include<cstring>
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
std::string opers = "+-*/%)(&";
class Task
{
public:
Task()
{
}
Task(int data_1,int data_2,char op)
:_data_1(data_1)
,_data_2(data_2)
,_result(0)
,_oper(op)
,_code(ok)
{
}
~Task()
{
}
void Run()
{
switch(_oper)
{
case '+':
_result = _data_1 + _data_2;
break;
case '-':
_result = _data_1 - _data_2;
break;
case '*':
_result = _data_1 * _data_2;
break;
case '/':
{
if(_data_2 == 0)
_code = div_zero;
else _result = _data_1 / _data_2;
}
break;
case '%':
{
if(_data_2 == 0)
_code = mod_zero;
else _result = _data_1 % _data_2;
}
break;
default:
_code = unknow;
break;
}
}
std::string PrintTask()
{
std::string s;
s = std::to_string(_data_1);
s += _oper;
s += std::to_string(_data_2);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(_data_1);
s += _oper;
s += std::to_string(_data_2);
s += "=";
s += std::to_string(_result);
s += " [";
s += std::to_string(_code);
s += "]";
return s;
}
void operator()()
{
Run();
sleep(2);
}
private:
int _data_1; //操作数 1
int _data_2; //操作数 2
int _result; //结果
char _oper; //操作
int _code; // 操作码
};
然后将我们的阻塞队列改为Task类型:
#include"BlockQueue.hpp"
#include<time.h>
#include"Task.hpp"
void* consumer(void* args)
{
//强转
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
//int data = 10;
Task data;
while(true)
{
bq->Pop(&data);
std::cout << "consumer gets Task "<< std::endl;
data();
std::cout << "consumer data: " << data.PrintResult() << std::endl;
}
return nullptr;
}
void* producter(void* args)
{
//强转
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
int data1 = rand() % 10 + 1;
usleep(rand()%123);
int data2 = rand() % 10 + 1;
usleep(rand()%123);
char oper = opers[rand() % (opers.size())];
//创建任务
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
bq->Push(t);
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
pthread_t tid_consum,tid_product;
int capacity = 20;
BlockQueue<Task>* bq = new BlockQueue<Task>(capacity);
//创建生产者
pthread_create(&tid_consum,nullptr,consumer,bq);
//创建消费者
pthread_create(&tid_product,nullptr,producter,bq);
pthread_join(tid_consum,nullptr);
pthread_join(tid_product,nullptr);
}
多生产多消费者问题
我们上面的代码已经完成了各个线程之间的互斥同步关系,所以我们可以直接创建多生产者和多消费者:
这里因为屏幕也是公共资源,所以我在向屏幕打印的时候也加上了锁:
#include"BlockQueue.hpp"
#include<time.h>
#include"Task.hpp"
pthread_mutex_t _mutex_2 = PTHREAD_MUTEX_INITIALIZER;
void* consumer(void* args)
{
//强转
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
//int data = 10;
Task data;
while(true)
{
bq->Pop(&data);
pthread_mutex_lock(&_mutex_2);
std::cout << "consumer gets Task "<<"name: "<<pthread_self()<< std::endl;
data();
std::cout << "consumer data: " << data.PrintResult() << std::endl;
pthread_mutex_unlock(&_mutex_2);
}
return nullptr;
}
void* producter(void* args)
{
//强转
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
int data1 = rand() % 10 + 1;
usleep(rand()%123);
int data2 = rand() % 10 + 1;
usleep(rand()%123);
char oper = opers[rand() % (opers.size())];
//创建任务
pthread_mutex_lock(&_mutex_2);
std::cout << "productor product a task"<<"name: "<<pthread_self()<< std::endl;
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
pthread_mutex_unlock(&_mutex_2);
bq->Push(t);
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
pthread_t tid_consum[8],tid_product[8];
int capacity = 20;
BlockQueue<Task>* bq = new BlockQueue<Task>(capacity);
//创建生产者
for(int i = 0 ;i < 8; i++)
{
pthread_create(&tid_consum[i],nullptr,consumer,bq);
}
//创建消费者
for(int i = 0; i < 8; i++)
{
pthread_create(&tid_product[i],nullptr,producter,bq);
}
for(int i = 0; i < 8; i++)
{
pthread_join(tid_consum[i],nullptr);
}
for(int i = 0; i < 8; i++)
{
pthread_join(tid_product[i],nullptr);
}
}
其他代码基本不变: