【Linux】同步原理剖析及模拟BlockQueue生产消费模型

发布于:2025-03-25 ⋅ 阅读:(25) ⋅ 点赞:(0)

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述


📢前言

紧接上一回的 从互斥原理到C++ RAII封装实践 笔者这回介绍一下线程中几乎与互斥一样重要的同步原理,

还有一点,笔者之后的封装都会使用之前博客中封装好的容器,需要的可以去仓库或者前面的博客中自取。

所需所用的都放在了这个仓库中
在这里插入图片描述


🏳️‍🌈一、线程同步概念

条件变量:当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

线程同步 指在多线程编程中,通过特定机制协调多个线程的执行顺序,确保它们对共享资源​(如内存、文件、硬件等)的访问安全有序。核心目标是防止并发访问导致的数据混乱、逻辑错误或资源冲突

🏳️‍🌈二、为什么需要线程同步?

2.1 防止数据竞争(Data Race)​

​问题:多个线程同时读写共享数据时,执行顺序不确定,可能导致数据不一致。

int balance = 100;  // 共享变量

// 线程A执行:存入200
balance += 200;  

// 线程B执行:取出150
balance -= 150;
  • 未同步时:若线程A和B同时读取balance=100,最终结果可能是100+200-150=150(正确应为150)或100-150+200=150,但若操作交叉执行(如A读后B写),可能得到错误值(如-50)。

2.2 保证操作的原子性

​问题:单个操作(如i++)在底层可能对应多条机器指令,线程切换会导致操作未完成就被中断

; x86的i++实际步骤:
mov eax, [i]  ; 读取i到寄存器
inc eax       ; 寄存器加1
mov [i], eax  ; 写回内存
  • 若线程A执行到inc eax后被切换,线程B修改了i,线程A恢复后会将旧值写回,导致结果错误。

2.​3 协调线程间的执行顺序

​场景:某些任务需要线程按特定顺序执行。
​生产者-消费者模型:消费者线程需等待生产者生成数据后再读取。
​任务依赖:线程B必须在线程A完成初始化后才能执行。

2.​4 避免资源争用(如文件、网络连接)​

​问题:多个线程同时写入同一文件或占用同一网络端口,会导致数据错乱或程序崩溃。

🏳️‍🌈线程同步的常见手段

在这里插入图片描述
同步问题的严重后果
​数据不一致:程序输出错误,如银行账户余额异常。
程序崩溃:多线程同时释放内存导致双重释放(Double Free)。
死锁(Deadlock)​:线程互相等待对方释放锁,导致永久阻塞。

说白了,线程同步就是一种为了统一管理生产消费者模型的一种机制

🏳️‍🌈三、什么是生产消费者模型

在这里插入图片描述

3.1 三种关系

在这里插入图片描述

3.2 两个角色

生产者,模拟是同数据的那方
消费者,取走数据的那方

3.3 一个场景

所有生产消费所用的数据都是在中间的 “超市” 中进行
在这里插入图片描述

🏳️‍🌈四、以阻塞队列模拟多生产消费者模型

下图是以阻塞队列模拟多生产消费者模型的基本过程

也就是有两类线程(生产者和消费者),从同一个场景(blockqueue)中放入和拿出数据的过程
在这里插入图片描述

4.1 成员名

我们利用现有的库函数对环境进行一下封装,再利用一个队列模拟临界资源

    private:
        std::queue<T> _q;               // 保存数据的容器,临界资源
        int _cap;                       // bq最大容量
        pthread_mutex_t _mutex;         // 互斥
        pthread_cond_t _productor_cond; // 生产者条件变量
        pthread_cond_t _consumer_cond;  // 消费者条件变量

        int _cwait_num;                 // 当前等待的消费者数量
        int _pwait_num;                 // 当前等待的生产者数量
    };

4.2 构造函数和析构函数

		BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)
        {
            pthread_mutex_init(&_mutex, nullptr);         // 创建互斥锁
            pthread_cond_init(&_productor_cond, nullptr); // 生产者条件变量
            pthread_cond_init(&_consumer_cond, nullptr);  // 消费者条件变量
        }
        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_productor_cond);
            pthread_cond_destroy(&_consumer_cond);
        }

4.3 模拟的生产者和消费者

void Equeue(const T &in) // 生产者
        {
            pthread_mutex_lock(&_mutex);
            // 你想放数据,就能放数据吗??生产数据是有条件的!
            // 结论1: 在临界区中等待是必然的(目前)
            while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!
            {
                std::cout << "生产者进入等待..." << std::endl;

                // 2. 等是,释放_mutex
                _pwait_num++;
                pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!
                _pwait_num--;
                // 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)

                std::cout << "生产者被唤醒..." << std::endl;

            }
            // 4. if(IsFull())不满足 || 线程被唤醒
            _q.push(in); // 生产

            // 肯定有数据!
            if(_cwait_num)
            {
                std::cout << "叫醒消费者" << std::endl;
                pthread_cond_signal(&_consumer_cond);
            }
            pthread_mutex_unlock(&_mutex);
        }

        void Pop(T *out) // 消费者
        {
            pthread_mutex_lock(&_mutex);
            while(IsEmpty())
            {
                std::cout << "消费者进入等待..." << std::endl;
                _cwait_num++;
                pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒
                _cwait_num--;
                std::cout << "消费者被唤醒..." << std::endl;
            }
            // 4. if(IsEmpty())不满足 || 线程被唤醒
            *out = _q.front();
            _q.pop();

            // 肯定有空间
            if(_pwait_num)
            {
                std::cout << "叫醒生产者" << std::endl;
                pthread_cond_signal(&_productor_cond);
            }

            pthread_mutex_unlock(&_mutex);
        }

4.4 模拟生产、消费者过程

我们假设生产速度小于消费速度,相当于我们没生产一个对象后需要花费一定的时间,但是消费者一直就绪,就要等生产者生产出来

void *Consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data;
        // 1. 从bq拿到数据
        bq->Pop(&data);
        
        // 2.做处理
        printf("Consumer, 消费了一个数据: %d\n", data);
    }
}

void *Productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    int data = 10;
    while (true)
    {
        sleep(2);
        // 1. 从外部获取数据
        // data = 10; // 有数据???

        // 2. 生产到bq中
        bq->Equeue(data);

        printf("producter 生产了一个数据: %d\n", data);
        data++;
    }
}

🏳️‍🌈五、整体代码

5.1 BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

namespace BlockQueueModule
{

    static const int gcap = 10;

    template <typename T>
    class BlockQueue
    {
    private:
        bool IsFull() { return _q.size() == _cap; }
        bool IsEmpty() { return _q.empty(); }

    public:
        BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)
        {
            pthread_mutex_init(&_mutex, nullptr);         // 创建互斥锁
            pthread_cond_init(&_productor_cond, nullptr); // 生产者条件变量
            pthread_cond_init(&_consumer_cond, nullptr);  // 消费者条件变量
        }

        void Equeue(const T &in) // 生产者
        {
            pthread_mutex_lock(&_mutex);
            // 你想放数据,就能放数据吗??生产数据是有条件的!
            // 结论1: 在临界区中等待是必然的(目前)
            while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!
            {
                std::cout << "生产者进入等待..." << std::endl;

                // 2. 等是,释放_mutex
                _pwait_num++;
                pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!
                _pwait_num--;
                // 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)

                std::cout << "生产者被唤醒..." << std::endl;

            }
            // 4. if(IsFull())不满足 || 线程被唤醒
            _q.push(in); // 生产

            // 肯定有数据!
            if(_cwait_num)
            {
                std::cout << "叫醒消费者" << std::endl;
                pthread_cond_signal(&_consumer_cond);
            }
            pthread_mutex_unlock(&_mutex);
        }

        void Pop(T *out) // 消费者
        {
            pthread_mutex_lock(&_mutex);
            while(IsEmpty())
            {
                std::cout << "消费者进入等待..." << std::endl;
                _cwait_num++;
                pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒
                _cwait_num--;
                std::cout << "消费者被唤醒..." << std::endl;
            }
            // 4. if(IsEmpty())不满足 || 线程被唤醒
            *out = _q.front();
            _q.pop();

            // 肯定有空间
            if(_pwait_num)
            {
                std::cout << "叫醒生产者" << std::endl;
                pthread_cond_signal(&_productor_cond);
            }

            pthread_mutex_unlock(&_mutex);
        }

        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_productor_cond);
            pthread_cond_destroy(&_consumer_cond);
        }

    private:
        std::queue<T> _q;               // 保存数据的容器,临界资源
        int _cap;                       // bq最大容量
        pthread_mutex_t _mutex;         // 互斥
        pthread_cond_t _productor_cond; // 生产者条件变量
        pthread_cond_t _consumer_cond;  // 消费者条件变量

        int _cwait_num;                 // 当前等待的消费者数量
        int _pwait_num;                 // 当前等待的生产者数量
    };
}

5.2 Main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

using namespace BlockQueueModule;

void *Consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data;
        // 1. 从bq拿到数据
        bq->Pop(&data);
        
        // 2.做处理
        printf("Consumer, 消费了一个数据: %d\n", data);
    }
}

void *Productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    int data = 10;
    while (true)
    {
        sleep(2);
        // 1. 从外部获取数据
        // data = 10; // 有数据???

        // 2. 生产到bq中
        bq->Equeue(data);

        printf("producter 生产了一个数据: %d\n", data);
        data++;
    }
}

int main()
{
    // 交易场所,不仅仅可以用来进行传递数据
    // 传递任务!!!v1: 对象 v2
    BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源
    // 单生产,单消费
    pthread_t c1, p1, c2, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    pthread_create(&c2, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);
    pthread_create(&p2, nullptr, Productor, bq);
    pthread_create(&p3, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    delete bq;

    return 0;
}

5.3 Makefile

bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)

$(bin):$(obj)
	$(cc) -o $@ $^ -lpthread
%.o:%.cc
	$(cc) -c $< -std=c++17

.PHONY:clean
clean:
	rm -f $(bin) $(obj)

.PHONY:test
test:
	echo $(src)
	echo $(obj)

👥总结

本篇博文对 同步原理剖析及模拟多消费者模型 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述


网站公告

今日签到

点亮在社区的每一天
去签到