目录
一、生产者消费者模型
1. 生产者消费者模型是什么?
生产者消费者模型是一种经典的多线程设计模式,用于解决生产者和消费者之间的数据交换问题。在这个模型中,生产者负责生成数据,而消费者负责处理数据。两者通过一个共享的缓冲区(通常是阻塞队列)进行通信,而不是直接交互。这种间接通信方式有效地降低了生产者和消费者之间的耦合度。
2. 为什么使用生产者消费者模型?
解耦:生产者和消费者不需要直接通信,它们通过共享缓冲区进行数据交换,降低了模块间的依赖性。
支持并发:生产者和消费者可以同时运行,提高了系统的并发性能。
支持忙闲不均:生产者和消费者的处理速度可以不同,通过缓冲区来平衡两者的处理能力。
3. 生产者消费者模型的特点(321原则)
🌵3种关系
生产者和生产者(互斥关系):多个生产者不能同时向缓冲区写入数据。
消费者和消费者(互斥关系):多个消费者不能同时从缓冲区读取数据。
生产者和消费者(互斥关系、同步关系):生产者和消费者不能同时访问缓冲区,且需要协调访问顺序。
🌵2种角色
生产者:负责生成数据并放入缓冲区。
消费者:负责从缓冲区中取出数据并处理。
🌵1个交易场所
缓冲区:通常是内存中的一段区域,用于存储生产者生成的数据。
二、基于BlockingQueue的生产者消费者模型
1. 什么是BlockingQueue?
阻塞队列(Blocking Queue)是一种特殊的队列,当队列为空时,从队列获取元素的操作会被阻塞;当队列满时,向队列放入元素的操作会被阻塞。阻塞队列在多线程编程中非常有用,可以简化生产者消费者模型的实现。
2. C++实现阻塞队列
以下是一个简单的阻塞队列实现,使用C++的std::queue
和POSIX线程库(pthread
)。
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
template <class T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() == _cap;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(int cap = 5) : _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
// 生产者调用:向阻塞队列插入数据
void Push(const T &data)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 队列满,生产者等待
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_empty); // 唤醒消费者
}
// 消费者调用:从阻塞队列获取数据
void Pop(T &data)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
// 队列空,消费者等待
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_full); // 唤醒生产者
}
private:
std::queue<T> _q; // 阻塞队列
int _cap; // 队列容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _full; // 队列满时的条件变量
pthread_cond_t _empty; // 队列空时的条件变量
};
代码说明:
互斥锁:保护共享资源(阻塞队列)的访问。
条件变量:
_full
用于通知生产者队列已满,_empty
用于通知消费者队列为空。Push和Pop方法:分别用于生产者和消费者的操作,确保线程安全。
3. 生产者和消费者的实现
生产者线程
void* Producer(void* arg) {
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true) {
sleep(1); // 模拟生产时间
int data = rand() % 100 + 1; // 生成随机数据
bq->Push(data); // 将数据放入阻塞队列
std::cout << "Producer: " << data << std::endl;
}
return nullptr;
}
消费者线程
void* Consumer(void* arg) {
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true) {
sleep(1); // 模拟消费时间
int data = 0;
bq->Pop(data); // 从阻塞队列中取出数据
std::cout << "Consumer: " << data << std::endl;
}
return nullptr;
}
主函数
int main() {
srand((unsigned int)time(nullptr));
pthread_t producer, consumer;
BlockQueue<int>* bq = new BlockQueue<int>;
// 创建生产者和消费者线程
pthread_create(&producer, nullptr, Producer, bq);
pthread_create(&consumer, nullptr, Consumer, bq);
// 等待线程结束
pthread_join(producer, nullptr);
pthread_join(consumer, nullptr);
delete bq;
return 0;
}
4. 生产者和消费者速度不一致的情况
生产者快,消费者慢
如果生产者生成数据的速度比消费者消费数据的速度快,阻塞队列会很快被填满。此时,生产者会被阻塞,直到消费者消费数据后释放空间。
void *Producer(void *arg)
{
BlockQueue<int> *bq = (BlockQueue<int> *)arg;
// 生产者不断进行生产
while (true)
{
int data = rand() % 100 + 1;
bq->Push(data); // 生产数据
std::cout << "Producer: " << data << std::endl;
}
}
void *Consumer(void *arg)
{
BlockQueue<int> *bq = (BlockQueue<int> *)arg;
// 消费者不断进行消费
while (true)
{
sleep(1);
int data = 0;
bq->Pop(data); // 消费数据
std::cout << "Consumer: " << data << std::endl;
}
}
生产者慢,消费者快
如果消费者消费数据的速度比生产者生成数据的速度快,阻塞队列会很快变空。此时,消费者会被阻塞,直到生产者生成新的数据。
void *Producer(void *arg)
{
BlockQueue<int> *bq = (BlockQueue<int> *)arg;
// 生产者不断进行生产
while (true)
{
sleep(1);
int data = rand() % 100 + 1;
bq->Push(data); // 生产数据
std::cout << "Producer: " << data << std::endl;
}
}
void *Consumer(void *arg)
{
BlockQueue<int> *bq = (BlockQueue<int> *)arg;
// 消费者不断进行消费
while (true)
{
int data = 0;
bq->Pop(data); // 消费数据
std::cout << "Consumer: " << data << std::endl;
}
}
5. 基于计算任务的生产者消费者模型
定义任务类
#pragma once
class Task
{
public:
Task(int x = 0, int y = 0, char op = '+') : _x(x), _y(y), _op(op) {}
void Run()
{
int result = 0;
switch (_op)
{
case '+':
result = _x + _y;
break;
case '-':
result = _x - _y;
break;
case '*':
result = _x * _y;
break;
case '/':
if (_y == 0)
{
std::cout << "Warning: division by zero!" << std::endl;
result = -1;
}
else
{
result = _x / _y;
}
break;
default:
std::cout << "Error: invalid operation!" << std::endl;
}
std::cout << _x << " " << _op << " " << _y << " = " << result << std::endl;
}
private:
int _x, _y;
char _op;
};
生产者和消费者线程
void *Producer(void *arg)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;
const char *ops = "+-*/";
while (true)
{
int x = rand() % 100;
int y = rand() % 100 + 1; // 避免除以零
char op = ops[rand() % 4];
Task t(x, y, op);
bq->Push(t);
std::cout << "Produced task: " << x << " " << op << " " << y << std::endl;
}
return nullptr;
}
void *Consumer(void *arg)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;
while (true)
{
Task t;
bq->Pop(t);
t.Run();
}
return nullptr;
}