如何通过阻塞队列优化生产者消费者模型中的线程同步
生产者消费者模型是一种常见的多线程编程模式,用于解决多线程之间的资源共享问题。在这个模型中,生产者负责生成数据,而消费者负责消费这些数据。为了有效管理共享资源,通常会引入一个缓冲区,生产者将数据放入缓冲区,消费者从缓冲区取出数据。生产者和消费者之间的关系是互相依赖的,生产者等待消费者消费数据后才能继续生产,消费者则等待生产者生成数据。这个模型能够有效地解决不同生产和消费速度带来的问题,提高系统的并发性和效率。
💬 欢迎讨论:如果你在学习过程中有任何问题或想法,欢迎在评论区留言,我们一起交流学习。你的支持是我继续创作的动力!
👍点赞、收藏与分享:觉得这篇文章对你有帮助吗?别忘了点赞、收藏并分享给更多的小伙伴哦!你们的支持是我不断进步的动力!
🚀分享给更多人:如果你觉得这篇文章对你有帮助,欢迎分享给更多对Linux OS感兴趣的朋友,让我们一起进步!
一. ⽣产者消费者模型
1.1 什么是⽣产者消费者模型?
生产者消费者模型(Producer-Consumer Model)是一种常见的并发编程模型,用于描述多个进程或线程之间共享资源的情况。其核心思想是将任务的生成与任务的处理分离,以便在多线程或多进程环境中有效地协调生产和消费过程。
1.2 ⽣产者消费者模型特点
【⽣产者消费者模型】的最根本特点是 321 原则 \color{Fuchsia}321原则 321原则 。
3 \color{OrangeRed}3 3 种关系
– 生产者与生产者:互斥
– 消费者与消费者:互斥
– 生产者与消费者:互斥与同步
2 \color{OrangeRed}2 2 种角色:
– 生产者
– 消费者
1 \color{OrangeRed}1 1 个交易场所
– 通常是一个特定的缓冲区(阻塞队列,环形队列等),这个在以前也碰到过,进程间通信中的管道就是一个数据缓冲区,里面有数据时,通知读端将数据读出,管道为空时,通知写端写入数据,本质上也是一种CP模型。
注: 321 \color{OrangeRed}321 321 原则并不是官方的说法,只是用来帮助我们进行更好的理解。
任何【生产者消费者模型】都离不开上述的特点
理解生产者与消费者的同步关系
- 生产者不断生产商品后,交易场所堆满后,需要通知消费者进行消费
- 消费者不断消费,当交易场所为空时,需要通知生产者进行生产
通知线程需要用到条件变量,来维护两者的同步关系
1.3 ⽣产者消费者模型优点
【⽣产者消费者模型】为何高效?
高效体现在处理任务上。
- 生产者,消费者可以在同一个交易场所中进行操作
- 生产者在生产时,无需关注消费者的状态,只需关注交易场所是否有空闲位置即可
- 消费者在消费时,无需关注生产者的状态,只需关注交易场所是否有位置即可
- 可以通过策略,调整生产者与消费者之间的协同关系
【⽣产者消费者模型】可以根据供需平衡合理调整策略,做到忙闲不均
除此之外,三个地方可以各司其职,各至忙自己的东西,比如,超时很忙的时候,生产者继续生产,消费者继续消费,做到解耦。
二. 基于阻塞队列实现⽣产者消费者模型
2.1 阻塞队列
阻塞队列是一种特殊的数据结构,作为队列家族的一员,天然具备先进先出 F I F O \color{OrangeRed}FIFO FIFO 原则的基本特性,与普通队列不同的是:阻塞队列大小固定,容量有限。
注意:可以将该阻塞队列理解为仓库,更好地理解,下面讲解统一将队列成为仓库。
将其应用于【⽣产者消费者模型】中,入队 就是 生产产品,出队 就是 消费产品。
- 当仓库满的时候,仓库都满了,还怎么生产,你生产了放哪去,这成为了问题,生产者就需进行停止生产(阻塞住自己),所以需要通知生产者停止生产,同时通知消费者赶紧消费仓库中的商品。
- 当仓库光秃秃的时候,消费者还怎么消费,消费空气吗,所以消费者就需进行停止消费(阻塞),马上通知生产者赶紧进行生产,当生产出产品时通知消费者进行消费,这就体现出了了同步关系。
下面将上述的理论转换成代码,一步一步的帮助大家实现它。
2.2 单生产单消费模型
问题:想一想,在设计该阻塞队列之前,需要什么变量。
- 队列用于当做交易场所,存储数据
- 用一个变量来指明队列容量的大小
- 锁用来维护互斥关系
- 条件变量用来维护同步关系
基于上述思考可以知道阻塞队列应该具有如下的成员变量,为了防止类名冲突,我们单独将他设计自自己的命名空间内,同时为了支持泛型编程,我们设计成模版,方便处理任意类型的数据。伪代码如下:
template <typename T>
class BlockQueue
{
private:
std::queue<T> _q; // 临界资源
int _cap; // 队列容量大小
Mutex _mutex;
Cond _full_cond;
Cond _empty_cond;
};
- 思考一下:为什么需要两个条件变量,一个条件变量的方案行的通吗?
首先先解释后面的,一个条件变量的方案肯定行的通,只不过很复杂,例如当消费者消费完仓库中的数据时,就需要将自己在该条件变量进行阻塞,生产者也一样,当唤醒任意一个消费者时,在该条件变量下,还需要分清楚哪个是消费者哪个是生产者,效率极低;为什么两个条件变量效率高,例如当生产者生产完数据,直接去在唤醒指定条件变量下等待消费者,消费者一样,不需要额外的判断,利用空间换取时间的做法。
上面的成员变量有没有缺少呢???
- 另一个问题:当生产者生产完数据怎么知道在指定条件变量有没有消费者在等待呢,或者当消费者消费完数据怎么知道在指定条件变量有没有生产者在等待呢。
可以增加两个变量分别记录消费者与生产者休眠的个数。在生产数据的同时进行判断有没有消费者在休眠,如果有就去制定条件变量下唤醒指定消费者消费数据;在消费数据的同时进行判断有没有生产者在休眠,如果有就去制定条件变量下唤醒指定生产者生产数据,为什么可以这样做???下面帮助大家理解一下,仓库已经满了,已经无法放下东西,这时候,消费者消费了一个商品,这时候就可以唤醒生产者生产一个任务。
- 伪代码如下:
template <typename T>
class BlockQueue
{
private:
std::queue<T> _q; // 临界资源
int _cap; // 队列容量大小
Mutex _mutex;
Cond _full_cond;
Cond _empty_cond;
int _csleep_num; // 消费者休眠个数
int _psleep_num; // 生产者休眠个数
};
上面已经完成了,基本的设计框架,能想出这些东西,也是很厉害的,给你点赞。下面将需要完成成员函数如如何生产数据,如何消费数据,如何进行同步通知等方法,下面跟着小编步伐继续深入探索。
使用BlockQueue模版类创建一个对象需要完成初始化的工作,初始化就是对上述的成员变量进行赋值操作。也就是构造函数需要完成该功能。代码如下:
const int defaultcap = 10;//队列容量的默认值
template <typename T>
class BlockQueue
{
BlockQueue(int cap = defaultcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
}
};
完成构造,一定也需要清理资源,毕竟有始有终。代码如下:
template <typename T>
class BlockQueue
{
//由于成员变量基本都是内置类型,所以不需要显示清理资源
~BlockQueue()
{
}
};
判断队列为空未满的接口如下,为了服务下面的入数据和出数据的功能,代码如下:
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() >= _cap;
}
bool IsEmpty()
{
return _q.empty();
}
};
生产者生产的数据入仓库(队列),咱们现在就需要完成该功能。如何完成呢???
- 思考一下:怎么入数据,什么时候入数据。
为了保证入数据的原子性,咱们对临界资源进行加锁,入完数据之后,需要判断是否有消费者在等待,有则唤醒即可。通过上述大致思考流程,写出的代码如下:
template <typename T>
class BlockQueue
{
private:
void Equeue(const T &in)
{
{
LockGuard lockguard(_mutex);
// 生产者调用
if (IsFull())
{
_psleep_num++;
_full_cond.Wait(_mutex);
_psleep_num--;
}
_q.push(in);
if (_csleep_num > 0)
{
_empty_cond.Signal();
std::cout << "唤醒消费者..." << std::endl;
}
}
}
};
上述代码有没有漏洞,下面思考下以下几个问题:
- 问题1:为什么需要加锁???
为了保证原子性,因为各个线程都有占有CPU的时间片,时间片耗尽,CPU会强制将该线程剥离CPU,放入阻塞队列中,所以生产者生产数据到一半时,而生产者就被剥离了,而其他的生产者正在向我所生产的地方生产数据,造成数据的丢失,数据不一致。
2. 问题2:if 判断有没有缺陷???
举个例子:假设仓库中只有一个空位置,而此时唤醒所有的生产者来生产数据,第一个生产者生产完数据后,将空位置填充,此时仓库又是满的,其它的生产者同时也在生产数据,而此仓库是满的,会导致未定义的行为。所以需将
i f \color{OrangeRed}if if 改成 w h i l e \color{OrangeRed}while while,代码如下:
template <typename T>
class BlockQueue
{
public:
void Equeue(const T &in)
{
{
LockGuard lockguard(_mutex);
// 生产者调用
while (IsFull())//增加代码的健壮性
{
_psleep_num++;
_full_cond.Wait(_mutex);
_psleep_num--;
}
_q.push(in);
if (_csleep_num > 0)
{
_empty_cond.Signal();
std::cout << "唤醒消费者..." << std::endl;
}
}
}
};
该代码利用增加代码的健壮性。
消费者消费数据也需要完成指定功能,如何实现它???
- 思考一下:如何消费数据,什么时候消费数据?
当队列不为空时,直接从队列中取数据即可;当队列为空时,就需要进行阻塞等待。代码如下:
template <typename T>
class BlockQueue
{
private:
T Pop()
{
T data;
{
// 消费者调用
LockGuard lockguard(_mutex);//采用RAII风格,锁对象结束,自动释放资源
while (IsEmpty())
{
_csleep_num++;
_empty_cond.Wait(_mutex);
_csleep_num--;
}
data = _q.front();
_q.pop();
if (_psleep_num > 0)
{
_full_cond.Signal();
std::cout << "唤醒生产者..." << std::endl;
}
}
return data;
}
};
这里的问题与上述类似,不再讲述了,浪费口舌。
2.3 多生产多消费模型
- 思考一下:代码需要改吗???
不需要,为什么???因为锁的存在,天然维护了消费者与消费者和生产者与生产者之间的互斥关系,一个消费者必须等其他的消费者消费完,才能获取锁进入临界资源;同理,一个生产者也必须等其他的生产者生产完数据入仓库后,才能获取锁,进入临界资源,将生产的产品入仓库。
代码如下:
Block.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
const int defaultcap = 10;
using namespace MutexModule;
using namespace CondModule;
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() >= _cap;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(int cap = defaultcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
}
void Equeue(const T &in)
{
{
LockGuard lockguard(_mutex);
// 生产者调用
// while循环必要性??? 一例子帮助理解(如下):
// 1. 生产者线程 T1 调用 pthread_cond_signal 唤醒消费者。
// 2. 消费者线程 T2 被唤醒,但操作系统暂停其执行。
// 3. 另一个消费者线程 T3 抢先运行,消费了数据并清空队列。
// 4. T2 恢复执行时,队列已空,但 T2 误以为队列有数据(因未重新检查条件)。
while (IsFull())
{
_psleep_num++;
std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;
_full_cond.Wait(_mutex);
// 当前线程持有锁,进行等待前要先释放锁(该函数自动完成),让消费者进行消费
// 线程被唤醒时,需要重新申请锁,默认就在临界区内唤醒
_psleep_num--;
}
_q.push(in);
if (_csleep_num > 0)
{
_empty_cond.Signal();
std::cout << "唤醒消费者..." << std::endl;
}
}
}
T Pop()
{
T data;
{
// 消费者调用
LockGuard lockguard(_mutex);
while (IsEmpty())
{
_csleep_num++;
_empty_cond.Wait(_mutex);
_csleep_num--;
}
data = _q.front();
_q.pop();
if (_psleep_num > 0)
{
_full_cond.Signal();
std::cout << "唤醒生产者..." << std::endl;
}
}
return data;
}
~BlockQueue()
{
}
private:
std::queue<T> _q; // 临界资源
int _cap; // 队列容量大小
Mutex _mutex;
Cond _full_cond;
Cond _empty_cond;
int _csleep_num; // 消费者休眠个数
int _psleep_num; // 生产者休眠个数
};
const int defaultcap = 10; // 队列容量的默认值
template <typename T>
class BlockQueue
{
private:
T Pop()
{
T data;
{
// 消费者调用
LockGuard lockguard(_mutex);//采用RAII风格,锁对象结束,自动释放资源
while (IsEmpty())
{
_csleep_num++;
_empty_cond.Wait(_mutex);
_csleep_num--;
}
data = _q.front();
_q.pop();
if (_psleep_num > 0)
{
_full_cond.Signal();
std::cout << "唤醒生产者..." << std::endl;
}
}
return data;
}
};
该代码实现了一个 线程安全的阻塞队列,用于解决生产者消费者问题。生产者线程将数据插入队列,消费者线程从队列中取出数据。通过 互斥锁 和 条件变量,该实现确保了数据的正确性和线程之间的协调。使用 while 循环 解决了虚假唤醒问题,保证了线程在正确的条件下被唤醒。
Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <string.h>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
if (n == 0)
{
//std::cout << "加锁成功" << std::endl;
}
else
{
//std::cout << "加锁失败" << strerror(n) << std::endl;
}
}
void UnLock()
{
int n = pthread_mutex_unlock(&_mutex);
if (n == 0)
{
//std::cout << "解锁成功" << std::endl;
}
else
{
//std::cout << "解锁失败" << strerror(n) << std::endl;
}
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
pthread_mutex_t *Get()
{
return &_mutex;
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex &mutex):_mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.UnLock();
}
private:
Mutex &_mutex;
};
}
这个代码实现了一个简单的、基于 pthread 的互斥锁机制,并且通过 LockGuard 提供了自动加锁和解锁的功能。这种设计能有效避免手动解锁的遗漏,并减少死锁的风险。Mutex 类封装了 pthread_mutex_t,为多线程提供了加锁和解锁的基本操作,而 LockGuard 类则通过 RAII 的方式管理锁,使代码更加简洁、易于维护。
Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;
namespace CondModule
{
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond, nullptr);
}
void Wait(Mutex &mutex)
{
int n = pthread_cond_wait(&_cond,mutex.Get());
(void)n;
}
void Signal()
{
//唤醒在指定条件变量下等待的一个线程
pthread_cond_signal(&_cond);
}
void Broadcast()
{
//唤醒在指定条件变量下等待的所有线程
pthread_cond_broadcast(&_cond);
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};
};
这个设计模式通常用于 生产者消费者问题、读写锁问题 等场景,能够有效地控制线程的执行顺序,避免死锁和资源争用问题。
三. 最后
生产者-消费者模型通过解耦生产与消费过程,利用缓冲区协调多线程资源竞争。其核心遵循"321原则":生产者/消费者间互斥同步、同类线程互斥、单一共享缓冲区,实现高效任务分配。基于阻塞队列的实现通过互斥锁与双条件变量(满/空)管理线程阻塞与唤醒,采用while循环避免虚假唤醒,确保线程安全。该模型支持单/多生产消费场景,通过容量控制与信号通知平衡供需,提升系统并发性能与资源利用率,广泛应用于任务调度、IO处理等并发场景。