📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
📢前言
紧接上一回的 从互斥原理到C++ RAII封装实践
笔者这回介绍一下线程中几乎与互斥一样重要的同步原理,
还有一点,笔者之后的封装都会使用之前博客中封装好的容器,需要的可以去仓库或者前面的博客中自取。
所需所用的都放在了这个仓库中
🏳️🌈一、线程同步概念
条件变量
:当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
线程同步
指在多线程编程中,通过特定机制协调多个线程的执行顺序,确保它们对共享资源(如内存、文件、硬件等)的访问安全有序。核心目标是防止并发访问导致的数据混乱、逻辑错误或资源冲突。
🏳️🌈二、为什么需要线程同步?
2.1 防止数据竞争(Data Race)
问题:多个线程同时读写共享数据时,执行顺序不确定,可能导致数据不一致。
int balance = 100; // 共享变量
// 线程A执行:存入200
balance += 200;
// 线程B执行:取出150
balance -= 150;
- 未同步时:若线程A和B同时读取balance=100,最终结果可能是100+200-150=150(正确应为150)或100-150+200=150,但若操作交叉执行(如A读后B写),可能得到错误值(如-50)。
2.2 保证操作的原子性
问题:单个操作(如i++)在底层可能对应多条机器指令,线程切换会导致操作未完成就被中断
; x86的i++实际步骤:
mov eax, [i] ; 读取i到寄存器
inc eax ; 寄存器加1
mov [i], eax ; 写回内存
- 若线程A执行到inc eax后被切换,线程B修改了i,线程A恢复后会将旧值写回,导致结果错误。
2.3 协调线程间的执行顺序
场景:某些任务需要线程按特定顺序执行。
生产者-消费者模型:消费者线程需等待生产者生成数据后再读取。
任务依赖:线程B必须在线程A完成初始化后才能执行。
2.4 避免资源争用(如文件、网络连接)
问题:多个线程同时写入同一文件或占用同一网络端口,会导致数据错乱或程序崩溃。
🏳️🌈线程同步的常见手段
同步问题的严重后果
数据不一致
:程序输出错误,如银行账户余额异常。
程序崩溃
:多线程同时释放内存导致双重释放(Double Free)。
死锁(Deadlock)
:线程互相等待对方释放锁,导致永久阻塞。
说白了,线程同步就是一种为了统一管理生产消费者模型的一种机制
🏳️🌈三、什么是生产消费者模型
3.1 三种关系
3.2 两个角色
生产者
,模拟是同数据的那方
消费者
,取走数据的那方
3.3 一个场景
所有生产消费所用的数据都是在中间的 “超市”
中进行
🏳️🌈四、以阻塞队列模拟多生产消费者模型
下图是以阻塞队列模拟多生产消费者模型的基本过程
也就是有两类线程(生产者和消费者),从同一个场景(blockqueue)中放入和拿出数据的过程
4.1 成员名
我们利用现有的库函数对环境进行一下封装,再利用一个队列模拟临界资源
private:
std::queue<T> _q; // 保存数据的容器,临界资源
int _cap; // bq最大容量
pthread_mutex_t _mutex; // 互斥
pthread_cond_t _productor_cond; // 生产者条件变量
pthread_cond_t _consumer_cond; // 消费者条件变量
int _cwait_num; // 当前等待的消费者数量
int _pwait_num; // 当前等待的生产者数量
};
4.2 构造函数和析构函数
BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)
{
pthread_mutex_init(&_mutex, nullptr); // 创建互斥锁
pthread_cond_init(&_productor_cond, nullptr); // 生产者条件变量
pthread_cond_init(&_consumer_cond, nullptr); // 消费者条件变量
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productor_cond);
pthread_cond_destroy(&_consumer_cond);
}
4.3 模拟的生产者和消费者
void Equeue(const T &in) // 生产者
{
pthread_mutex_lock(&_mutex);
// 你想放数据,就能放数据吗??生产数据是有条件的!
// 结论1: 在临界区中等待是必然的(目前)
while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!
{
std::cout << "生产者进入等待..." << std::endl;
// 2. 等是,释放_mutex
_pwait_num++;
pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!
_pwait_num--;
// 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)
std::cout << "生产者被唤醒..." << std::endl;
}
// 4. if(IsFull())不满足 || 线程被唤醒
_q.push(in); // 生产
// 肯定有数据!
if(_cwait_num)
{
std::cout << "叫醒消费者" << std::endl;
pthread_cond_signal(&_consumer_cond);
}
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 消费者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
std::cout << "消费者进入等待..." << std::endl;
_cwait_num++;
pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒
_cwait_num--;
std::cout << "消费者被唤醒..." << std::endl;
}
// 4. if(IsEmpty())不满足 || 线程被唤醒
*out = _q.front();
_q.pop();
// 肯定有空间
if(_pwait_num)
{
std::cout << "叫醒生产者" << std::endl;
pthread_cond_signal(&_productor_cond);
}
pthread_mutex_unlock(&_mutex);
}
4.4 模拟生产、消费者过程
我们假设生产速度小于消费速度,相当于我们没生产一个对象后需要花费一定的时间,但是消费者一直就绪,就要等生产者生产出来
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
int data;
// 1. 从bq拿到数据
bq->Pop(&data);
// 2.做处理
printf("Consumer, 消费了一个数据: %d\n", data);
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data = 10;
while (true)
{
sleep(2);
// 1. 从外部获取数据
// data = 10; // 有数据???
// 2. 生产到bq中
bq->Equeue(data);
printf("producter 生产了一个数据: %d\n", data);
data++;
}
}
🏳️🌈五、整体代码
5.1 BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
namespace BlockQueueModule
{
static const int gcap = 10;
template <typename T>
class BlockQueue
{
private:
bool IsFull() { return _q.size() == _cap; }
bool IsEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)
{
pthread_mutex_init(&_mutex, nullptr); // 创建互斥锁
pthread_cond_init(&_productor_cond, nullptr); // 生产者条件变量
pthread_cond_init(&_consumer_cond, nullptr); // 消费者条件变量
}
void Equeue(const T &in) // 生产者
{
pthread_mutex_lock(&_mutex);
// 你想放数据,就能放数据吗??生产数据是有条件的!
// 结论1: 在临界区中等待是必然的(目前)
while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!
{
std::cout << "生产者进入等待..." << std::endl;
// 2. 等是,释放_mutex
_pwait_num++;
pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!
_pwait_num--;
// 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)
std::cout << "生产者被唤醒..." << std::endl;
}
// 4. if(IsFull())不满足 || 线程被唤醒
_q.push(in); // 生产
// 肯定有数据!
if(_cwait_num)
{
std::cout << "叫醒消费者" << std::endl;
pthread_cond_signal(&_consumer_cond);
}
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 消费者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
std::cout << "消费者进入等待..." << std::endl;
_cwait_num++;
pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒
_cwait_num--;
std::cout << "消费者被唤醒..." << std::endl;
}
// 4. if(IsEmpty())不满足 || 线程被唤醒
*out = _q.front();
_q.pop();
// 肯定有空间
if(_pwait_num)
{
std::cout << "叫醒生产者" << std::endl;
pthread_cond_signal(&_productor_cond);
}
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productor_cond);
pthread_cond_destroy(&_consumer_cond);
}
private:
std::queue<T> _q; // 保存数据的容器,临界资源
int _cap; // bq最大容量
pthread_mutex_t _mutex; // 互斥
pthread_cond_t _productor_cond; // 生产者条件变量
pthread_cond_t _consumer_cond; // 消费者条件变量
int _cwait_num; // 当前等待的消费者数量
int _pwait_num; // 当前等待的生产者数量
};
}
5.2 Main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
using namespace BlockQueueModule;
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
int data;
// 1. 从bq拿到数据
bq->Pop(&data);
// 2.做处理
printf("Consumer, 消费了一个数据: %d\n", data);
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data = 10;
while (true)
{
sleep(2);
// 1. 从外部获取数据
// data = 10; // 有数据???
// 2. 生产到bq中
bq->Equeue(data);
printf("producter 生产了一个数据: %d\n", data);
data++;
}
}
int main()
{
// 交易场所,不仅仅可以用来进行传递数据
// 传递任务!!!v1: 对象 v2
BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源
// 单生产,单消费
pthread_t c1, p1, c2, p2, p3;
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&c2, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_create(&p2, nullptr, Productor, bq);
pthread_create(&p3, nullptr, Productor, bq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
delete bq;
return 0;
}
5.3 Makefile
bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)
$(bin):$(obj)
$(cc) -o $@ $^ -lpthread
%.o:%.cc
$(cc) -c $< -std=c++17
.PHONY:clean
clean:
rm -f $(bin) $(obj)
.PHONY:test
test:
echo $(src)
echo $(obj)
👥总结
本篇博文对 同步原理剖析及模拟多消费者模型 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~