线程同步与互斥(下)

发布于:2025-04-13 ⋅ 阅读:(21) ⋅ 点赞:(0)

线程同步与互斥(中)https://blog.csdn.net/Small_entreprene/article/details/147003513?fromshare=blogdetail&sharetype=blogdetail&sharerId=147003513&sharerefer=PC&sharesource=Small_entreprene&sharefrom=from_link我们学习了互斥,紧接着认识到什么是同步,同步是为了什么,对于上一篇,还保留了一个问题还没算真正被解决:同步为什么要用到锁呢?(除了上一篇的部分解释,wait操作到底是为什么需要传锁)

在此之前,我们需要来封装一下条件变量:

条件变量的封装

通过封装pthread当中的cond条件变量,我们可以显著简化多线程编程中的条件等待和通知逻辑。封装不仅提高了代码的可读性和可维护性,还减少了手动管理锁带来的潜在问题。在实际开发中,合理使用条件变量封装可以显著提升程序的性能和可靠性。

#pragma once

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

using namespace MutexModule;

namespace CondModule
{
    class Cond
    {
    public:
        Cond()
        {
            pthread_cond_init(&_cond, nullptr);
        }
        void Wait(Mutex &mutex)
        {
            int n = pthread_cond_wait(&_cond, mutex.Get());
            (void)n;
        }
        void Signal()
        {
            // 唤醒在条件变量下等待的一个线程
            int n = pthread_cond_signal(&_cond);
            (void)n;
        }
        void Broadcast()
        {
            // 唤醒所有在条件变量下等待的线程
            int n = pthread_cond_broadcast(&_cond);
            (void)n;
        }
        ~Cond()
        {
            pthread_cond_destroy(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
};

代码中的头文件是之前文章中的封装,所以我们就可以使用我们自己封装的Mutex和Cond,对上一篇的生产者消费者模型做一下头文件的使用,还有内容的稍加修改:

BlockQueue.hpp: 

// 阻塞队列的实现
#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"

const int defaultcap = 10; // for test

using namespace MutexModule;
using namespace CondModule;

template <class T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _q.size() >= _cap;
    }

    bool IsEmpty()
    {
        return !_q.size();
    }

public:
    BlockQueue(int cap = defaultcap)
        : _cap(cap), _csleep_num(0), _psleep_num(0)
    {
        // 锁已经被我们封装了
        // 条件变量已经被我们封装了
    }

    void Equeue(const T &in)
    {
        { // lockguard就是将下面的整个临界区保护起来了,使用封装来实现了锁的自动构造加锁,和析构解锁(因为lockguard是临时变量的,生命周期是只在自己的作用域当中)
            // 下面判断和push都是访问临界资源:因为可能一个线程在入队列的时候,其他的线程在出队列,我们应该加锁
            LockGuard lockguard(_mutex);
            // 生产者调用
            // if (IsFull())//BUG!!!!!
            while (IsFull()) // 增加代码的健壮性
            {
                // 商品满了,需要等待,不然都没有位置放了
                // 重点1: 在临界区内休眠,可别将锁一起带去休眠了
                // 重点2: 当线程被唤醒的时候,也是重wait出来继续向后运行的,这就默认了就在临界区当中唤醒,但是之前锁不是被释放了吗?所以该线程要从pthread_cond_wait中成功返回,就需要当前线程重新申请锁
                // 重点3: 如果阻塞的线程被唤醒,但是申请锁失败了,就会在锁上阻塞等待!!!
                _psleep_num++;
                _full_cond.Wait(_mutex);
                _psleep_num--;
            }
            // 100%是队列有空间了
            _q.push(in);
            // 到这里就一定有数据,这就可以唤醒消费者来消费了

            // 临时方案:后续优化
            if (_csleep_num > 0)
            {
                // 别睡了,快来消费
                _empty_cond.Signal();
                std::cout << "唤醒消费者..." << std::endl;
            }
        }
    }

    T Pop()
    {
        T data;
        {
            // 消费者调用
            LockGuard lockguard(_mutex);
            while (IsEmpty())
            {
                _csleep_num++;
                _empty_cond.Wait(_mutex);
                _csleep_num--;
            }
            data = _q.front();
            _q.pop();
            // 消费者到这说明已经消费了,就一定有空间,所以可以唤醒生产者进行生产了

            if (_psleep_num > 0)
            {
                // 别睡了,快来生产
                _full_cond.Signal();
                std::cout << "唤醒生产者..." << std::endl;
            }

            return data;
        }
    }
    ~BlockQueue()
    {
        // 锁已经被我们封装了
        // 条件变量已经被我们封装了
    }

private:
    std::queue<T> _q; // 临界资源
    int _cap;         // 容量大小
    Mutex _mutex;     // 锁
    Cond _full_cond;  // 生产者生产满了,就把自己放在条件变量下
    Cond _empty_cond; // 消费者消费完了,就把自己放在条件变量下
    int _csleep_num;  // 消费者休眠的个数
    int _psleep_num;  // 生产者休眠的个数
};

我们来测试一下代码:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>

void *consumer(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);

    while (true)
    {
        sleep(3);
        // 1. 消费任务
        task_t t = bq->Pop();

        // 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了
        t();
    }
}

void *productor(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        // 1. 获得任务
        std::cout << "生产了一个任务: " << std::endl;

        // 2. 生产任务
        bq->Equeue(Download);
    }
}

int main()
{
    // 扩展认识: 阻塞队列: 可以放任务吗?
    // 申请阻塞队列
    BlockQueue<task_t> *bq = new BlockQueue<task_t>();

    // 构建生产和消费者
    pthread_t c[2], p[3];

    pthread_create(c, nullptr, consumer, bq);
    // pthread_create(c + 1, nullptr, consumer, bq);
    pthread_create(p, nullptr, productor, bq);
    // pthread_create(p + 1, nullptr, productor, bq);
    // pthread_create(p + 2, nullptr, productor, bq);

    pthread_join(c[0], nullptr);
    // pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    // pthread_join(p[1], nullptr);
    // pthread_join(p[2], nullptr);

    return 0;
}

后续,我们就可以拿着这些封装来实现相关代码。

POSIX信号量

回顾一下相关概念

我们在进程间信号的时候谈到了信号量,这个System V版本的信号量,对于我们今天要说的信号量,其实也是相同的理论。system V版本---信号量回顾(放映厅形象解释)https://blog.csdn.net/Small_entreprene/article/details/146120541?fromshare=blogdetail&sharetype=blogdetail&sharerId=146120541&sharerefer=PC&sharesource=Small_entreprene&sharefrom=from_link

POSIX是一种新的标准,和System V是类似的,只不过POSIX是更加常用的,反而是System V版本是快被淘汰了。

信号量也被称为信号灯,和信号根本没有联系,信号是一种进程间异步通知的方式,而信号量的本质是一个计数器,用来表示临界资源当中资源的数量有多少。对于放映厅的例子,我们只要将票买成功了,那么对应的位置就是属于我的了,哪怕今天我不去看这场电影了,放映厅这个位置也是必须要为我留的。

所以:信号量/信号灯本质就是一个计数器,是对待定资源的预定机制。

放映厅本身就是临界资源,我们看电影前买票,在技术人员看来就是保证放映厅内部资源的安全性,但是有一点点不一样,信号量是描述临界资源当中资源数量有多少:

假设放映厅中有50个座位,信号量的计数器就是50,申请一个,50--,当信号量被减到0时,就表明当前资源已经是被占用满的了,不能够再申请了。这样,我们就能够保证进到放映厅的人数不会超过座位数,最多50,然后再经过合理的调度,让不同的线程坐到不同的位置上,这样线程就可以访问放映厅的同时,访问放映厅内不同的座位了。我们说的这个放映厅的例子是将整个放映厅局部进行访问,也就是说,我们允许放映厅当中的座位被多个不同的人同时访问不同的位置,虽然整个放映厅是一个大的临界资源,但只要每一个人访问不同的小的资源,那么大家就不冲突。

那么如果有一个VIP放映厅,里面只有一个座位,所以对应的信号量计数器的值就是1,你只要申请成功的话,这个VIP放映厅就是你的,申请失败就要等,我们将这种计数器值为1的信号量,我们称之为二元信号量,二元信号量的本质不就是互斥嘛!幸运的是:我们目前已经学习了锁了😍

所以,多线程使用资源,有两种场景:

  • 将目标资源整体使用(使用锁计数/二元信号量)(上面我们按照条件变量所写的阻塞队列就是将阻塞队列这一资源整体使用的,所以我们要加锁)
  • 将目标资源按照不同的“块”,分批使用 (如果可以将一个整体资源划分成一块一块的局部资源,就可以往这些局部资源中放入线程,实现并发访问)

我们申请信号量,是要求所有的线程都得先看到信号量(sem),而信号量本身就是一个计数器,该计数器不再是我们所单纯认为的int count了(因为count++/--并不是原子的),所以我们可以对int count加一把锁,共同都成信号量,这样也说得通,但是我们知道信号量的实现肯定是更加复杂的。(申请资源 sem-- >>---<<释放资源 sem++ )。

所以,信号量本质也是临界资源,对应的P操作(就是对应信号量的--操作)一定是原子性的,V操作(就是对应信号量的++操作)一定是原子性的! 

基于环形队列的生产消费模型

环形队列是为了突出信号量的特征!!!

环形队列采用数组或链表模拟,用模运算来模拟环形状态!

  • 环形队列为空,代表head指针 == tail指针;
  • 环形队列为空,也是代表head指针 == tail指针;

我们应该清楚:写这种数据结构的时候,少不了对该数据结构进行判空或判满,很明显,空和满都是head == tail; 的,所以我们需要有空和满的解决方案:

  1. 方案1:使用计数器(int count,默认为0):如果环形队列入数据,那么相应的,count++; 出数据,count--; 所以count为0表示环形队列为空,count为环形队列的容量要求时,该环形队列为满;
  2. 方案2:预留一个空的位置,作满的状态:(下面是解释)(head == tail 默认表示的就是环形队列是空的)

将来生产(入队列)的时候,在不考虑消费(出队列)的过程的时候,tail的位置是处于内容为空的,也就是没有生产任务,但是是准备生产任务状态,是否生产任务,需要判断:tail的当前位置的下一个位置是否是head,不相等的话,就说明tail可以生产,因为没满。

if (head == tail)
{
    // 空状态
}
else if ((tail + 1) % n == head)
{
    // 满状
}
else
{
    // 可生产状态
}

我们环形队列是可以使用链表实现,但是没有必要,我们就按照数组模拟实现固定大小的环形队列(通过取模来实现位置的正确变换)

 对于我们上面的判空/判满的两种方案,其实对于今天的生产者消费者模型来说是没有用到的,因为我们还有另外一种方案:信号量!!!

我们形象一点,我们将环形队列看成一个大圆桌,每个位置只能放一个苹果(生产任务),为了实现生产者消费者模型,我们有如下几个约定:

  • 约定1: 队列为空,生产者先运行
  • 约定2: 队列为满,消费者先运行
  • 约定3: 生产者不能把消费者太一个圈以上(否者会将旧任务覆盖成新任务)
  • 约定4: 消费者不能超过生产者(否则根本就没有苹果给你消费) 

所以,遵守上面的约定:

  1. 生产者和消费者不访问同一个位置,两者就可以同时运行。
  2. 那么什么时候,两者会在同一位置呢?为空或者为满的时候!!!(也就是说,如果不为空或者不为满的时候,两者的操作就可以同时运行)

为空:只能【互斥】生产者先【同步】运行

为满:只能【互斥】消费者先【同步】运行

结论:(遵守4个约定下)

环形队列,不为空 && 不为满,生产消费可以同时运行;

环形队列,为空 || 为满的时候,生产和消费需要同步和互斥。



我们是使用信号量来保证上面的4个约定!!!信号量是表示临界资源数目的,所以:

对于生产者关心的资源:环形队列中空的位置(初始:sem_blank = N;)(N是环形队列的大小)

对于消费者关心的资源:环形队列中的有效数据(有数据)(初始:sem_data = 0;)

我们下面来模拟一下生产消费的追逐过程(伪代码形式):

//《《《《生产者》》》》
int p_step = 0; // 生产者起始位置下标
P(sem_blank);   // P操作: sem_blank--
p_step++;
p_step %= N;   // 走到下一个"空"位置
V(sem_data++); // 生产一个数据,当然就让消费数据量多了一个

//《《《《消费者》》》》
int c_step = 0; // 消费者起始位置下标
P(sem_data);    // P操作: sem_data--(取走任务数据,也就是消费)
c_step++;
c_step %= N;    // 走到下一个"有效位置"位置
V(sem_blank++); // 消费一个数据,当然就让空位置多了一个

 信号量的P操作是原子的,申请成功,继续运行,申请失败,申请的线程会被阻塞(刚开始sem_data为0,sem_blank大于0,消费者就会在对应的P(sem_data)位置进行阻塞,生产者是可以进行P操作,这就是让队列为空的时候,生产者先运行)(生产满了也是如此)而且PV操作是原子的,所以如上的4个约定我们就可以遵守了。


POSIX版的信号量的接口介绍

初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

sem:指向信号量的指针。

pshared:0表示线程间共享,非零表示进程间共享。

value:信号量的初始值。

销毁信号量
int sem_destroy(sem_t *sem);

参数:

sem:指向要销毁的信号量的指针。

等待信号量
int sem_wait(sem_t *sem); // P()操作

功能: 等待信号量,会将信号量的值减1。

参数:sem:指向信号量的指针。

发布信号量
int sem_post(sem_t *sem); // V()操作

功能: 发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

参数:sem:指向信号量的指针。

我们可以发现,其实信号量的接口跟条件变量是很相似的结构的。

sem信号量的封装

Sem.hpp

#include <iostream>
#include <semaphore.h>
#include <pthread.h>

// 定义一个命名空间SemModule,用于封装信号量相关的操作
namespace SemModule
{
    // 定义一个常量defaultvalue,表示信号量的默认初始值
    const int defaultvalue = 1;

    // 定义一个类Sem,用于封装POSIX信号量的操作
    class Sem
    {
    public:
        // 构造函数,初始化信号量
        // 参数:
        // sem_value:信号量的初始值,默认为defaultvalue
        Sem(unsigned int sem_value = defaultvalue)
        {
            sem_init(&_sem, 0, sem_value);
        }

        // P操作,等待信号量,会将信号量的值减1
        // 这是一个原子操作
        void P()
        {
            int n = sem_wait(&_sem); // 原子的
            (void)n; // 忽略返回值,防止编译器警告
        }

        // V操作,发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
        // 这是一个原子操作
        void V()
        {
            int n = sem_post(&_sem); // 原子的
        }

        // 析构函数,销毁信号量
        ~Sem()
        {
            sem_destroy(&_sem);
        }

    private:
        // 信号量的内部表示
        sem_t _sem;
    };
}

这段代码定义了一个名为Sem的类,用于封装POSIX信号量的操作。类中包含了信号量的初始化、P操作(等待信号量)、V操作(发布信号量)以及析构函数(销毁信号量)。这些操作都是原子的,确保了线程安全。我们后续可以利用。

基于环形队列的生产消费模型实现(基于信号量)

环形队列:RingQueue.hpp

#pragma once

#include <iostream>
#include <vector>
#include "Sem.hpp"

// 定义全局常量gcap,用于调试时设置队列容量
static const int gcap = 5; // for debug

// 使用SemModule命名空间,简化代码书写
using namespace SemModule;

// 定义RingQueue模板类,用于实现环形队列
template <typename T>
class RingQueue
{
public:
    // 构造函数,初始化环形队列
    // 参数:
    // cap:队列容量,默认为gcap
    RingQueue(int cap = gcap)
        : _cap(cap),
          _rq(cap),
          _blank_sem(cap), // 初始化空位置信号量,初始值为cap
          _p_step(0),      // 初始化生产者步骤下标
          _data_sem(0),    // 初始化数据信号量,初始值为0
          _c_step(0)       // 初始化消费者步骤下标
    {
    }

    // 入队操作,生产者使用
    void Equeue(const T &in)
    {
        // 1. 申请空位置信号量,等待有空位置
        _blank_sem.P();
        // 2. 生产数据,放入队列
        _rq[_p_step] = in;
        // 3. 更新生产者步骤下标
        ++_p_step;
        // 4. 维持环形特性,取模操作
        _p_step %= _cap;
        // 5. 发布数据信号量,通知消费者有新数据
        _data_sem.V();
    }

    // 出队操作,消费者使用
    void Pop(T *out)
    {
        // 1. 申请数据信号量,等待有数据
        _data_sem.P();
        // 2. 消费数据,从队列中取出
        *out = _rq[_c_step];
        // 3. 更新消费者步骤下标
        ++_c_step;
        // 4. 维持环形特性,取模操作
        _c_step %= _cap;
        // 5. 发布空位置信号量,通知生产者有空位置
        _blank_sem.V();
    }

private:
    std::vector<T> _rq; // 环形队列的存储
    int _cap;           // 队列容量

    // 生产者相关
    Sem _blank_sem; // 空位置信号量
    int _p_step;    // 生产者步骤下标

    // 消费者相关
    Sem _data_sem; // 数据信号量
    int _c_step;   // 消费者步骤下标
};

这段代码定义了一个名为RingQueue的模板类,用于实现一个线程安全的环形队列。队列使用信号量和互斥锁来同步生产者和消费者之间的操作,确保了线程安全。生产者使用Equeue方法入队,消费者使用Pop方法出队。信号量用于控制空位置和数据的同步,互斥锁用于保护临界区,防止数据竞争。

我们这是基于单生产-单消费的模型:下面我们来测试一下:

Main.cc

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"

struct threaddata
{
    RingQueue<int> *rq;
    std::string name;
};

void *consumer(void *args)
{
    threaddata *td = static_cast<threaddata *>(args);

    while (true)
    {
        sleep(1);
        // 1. 消费任务
        int t = 0;
        td->rq->Pop(&t);

        // 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了
        std::cout << td->name << " 消费者拿到了一个数据:  " << t << std::endl;
        // t();
    }
}

int data = 1;

void *productor(void *args)
{
    threaddata *td = static_cast<threaddata *>(args);

    while (true)
    {
        sleep(1);
        // sleep(2);
        // 1. 获得任务
        // std::cout << "生产了一个任务: " << x << "+" << y << "=?" << std::endl;
        std::cout << td->name << " 生产了一个任务: " << data << std::endl;

        // 2. 生产任务
        td->rq->Equeue(data);

        data++;
    }
}

int main()
{
    // 扩展认识: 阻塞队列: 可以放任务吗?
    // 申请阻塞队列
    RingQueue<int> *rq = new RingQueue<int>();

    // 构建生产和消费者
    // 单单: cc, pp -> 互斥关系不需要维护,互斥与同步
    pthread_t c[2], p[3];

    threaddata *td = new threaddata();
    td->name = "cthread-1";
    td->rq = rq;
    pthread_create(c, nullptr, consumer, td);

    threaddata *td3 = new threaddata();
    td3->name = "pthread-2";
    td3->rq = rq;
    pthread_create(p, nullptr, productor, td3);

    pthread_join(c[0], nullptr);
    pthread_join(p[0], nullptr);

    return 0;
}

单单的测试结果:

根据"321原则",21我们已经是实现了的(2:生产消费,1:一个环形队列),对于单生产单消费,生产者之间和消费者之间的互斥是不需要我们维护的,我们使用的环形队列中使用信号量是维护了生产者和消费者之间的同步(执行有先后)与互斥(访问同一资源,只能一方访问,因为信号量为0就可以阻塞住)

但是如果是多生产多消费呢?同样的,我们根据"321原则",我们需要多对生产者之间和消费者之间的互斥关系:和上一篇的不一样,我们这里和上一篇的不同是没有加锁,所以,我们的RingQueue.hpp需要改:

这里我们有一个问题:

先申请信号量再加锁,还是先加锁,再申请信号量?

先申请锁就是先选择线程来允许访问临界区资源,再申请信号量,申请信号量本质就是对资源的预定机制;

先申请信号量再加锁的顺序,首先确保有足够的资源可供使用(通过信号量控制),然后再确保对这些资源的独占访问(通过锁实现)。

就举放映厅的例子,

对于先申请锁再申请信号量的情形:我们是需要现在放映厅门口排队的,因为进入放映厅是需要一个个进入(同步机制),等轮到你了(申请锁成功),再打开微信进行买票,买成功了(申请信号量成功),大家就可以进来,买失败了,后面大家都要等。

对于先申请信号量再申请锁的情形:我们所有的人要去放映厅看电影,我们所有人先将电影票买了(申请信号量),对有限的电影票进行瓜分,没有抢到电影票的人就不可能进来,就没有资格在门口排队,买了票的人在依次在放映厅门口排队(申请锁)。

上面很明显是后者的效率高(因为信号量的申请是原子的,而且不会超申请,资源有限),对于前者,先申请锁,那么申请锁,竞争锁成功的人,还需要申请信号量,更关键的是申请锁失败的线程还需要将锁给释放了,别的线程才有机会获取锁,后者就不一样了,申请信号量申请锁成功了的线程访问临界区资源,其他线程也没闲着,都在申请信号量,预定自己的资源。

总结来说,“先申请信号量再加锁”的顺序,首先通过信号量确保有足够的资源可供使用,然后再通过锁确保对这些资源的独占访问。这种顺序可以有效地避免资源冲突和数据不一致的问题,同时提高资源的利用率。

所以,我们使用后者!!!

#pragma once

#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"

// 定义全局常量gcap,用于调试时设置队列容量
static const int gcap = 5; // for debug

// 使用SemModule和MutexModule命名空间,简化代码书写
using namespace SemModule;
using namespace MutexModule;

// 定义RingQueue模板类,用于实现环形队列
template <typename T>
class RingQueue
{
public:
    // 构造函数,初始化环形队列
    // 参数:
    // cap:队列容量,默认为gcap
    RingQueue(int cap = gcap)
        : _cap(cap),
          _rq(cap),
          _blank_sem(cap), // 初始化空位置信号量,初始值为cap
          _p_step(0),      // 初始化生产者步骤下标
          _data_sem(0),    // 初始化数据信号量,初始值为0
          _c_step(0)       // 初始化消费者步骤下标
    {
    }

    // 入队操作,生产者使用
    void Equeue(const T &in)
    {
        // 1. 申请空位置信号量,等待有空位置
        _blank_sem.P();
        {
            // 使用锁保护临界区,确保线程安全
            LockGuard lockguard(_pmutex);
            // 2. 生产数据,放入队列
            _rq[_p_step] = in;
            // 3. 更新生产者步骤下标
            ++_p_step;
            // 4. 维持环形特性,取模操作
            _p_step %= _cap;
        }
        // 5. 发布数据信号量,通知消费者有新数据
        _data_sem.V();
    }

    // 出队操作,消费者使用
    void Pop(T *out)
    {
        // 1. 申请数据信号量,等待有数据
        _data_sem.P();
        {
            // 使用锁保护临界区,确保线程安全
            LockGuard lockguard(_cmutex);
            // 2. 消费数据,从队列中取出
            *out = _rq[_c_step];
            // 3. 更新消费者步骤下标
            ++_c_step;
            // 4. 维持环形特性,取模操作
            _c_step %= _cap;
        }
        // 5. 发布空位置信号量,通知生产者有空位置
        _blank_sem.V();
    }

private:
    std::vector<T> _rq; // 环形队列的存储
    int _cap;           // 队列容量

    // 生产者相关
    Sem _blank_sem; // 空位置信号量
    int _p_step;    // 生产者步骤下标

    // 消费者相关
    Sem _data_sem; // 数据信号量
    int _c_step;   // 消费者步骤下标

    // 维护多生产者和多消费者同步,需要两把锁
    Mutex _cmutex; // 消费者锁
    Mutex _pmutex; // 生产者锁
};

接下来,我们来测试一下多生产多消费的情形:

Main.cc

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"

struct threaddata
{
    RingQueue<int> *rq;
    std::string name;
};

void *consumer(void *args)
{
    threaddata *td = static_cast<threaddata *>(args);

    while (true)
    {
        sleep(3);
        // 1. 消费任务
        int t = 0;
        td->rq->Pop(&t);

        // 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了
        std::cout << td->name << " 消费者拿到了一个数据:  " << t << std::endl;
        // t();
    }
}

int data = 1;

void *productor(void *args)
{
    threaddata *td = static_cast<threaddata *>(args);

    while (true)
    {
        sleep(1);
        // sleep(2);
        // 1. 获得任务
        // std::cout << "生产了一个任务: " << x << "+" << y << "=?" << std::endl;
        std::cout << td->name << " 生产了一个任务: " << data << std::endl;

        // 2. 生产任务
        td->rq->Equeue(data);

        data++;
    }
}

int main()
{
    // 扩展认识: 阻塞队列: 可以放任务吗?
    // 申请阻塞队列
    RingQueue<int> *rq = new RingQueue<int>();

    // 构建生产和消费者
    // 如果我们改成多生产多消费呢??
    // 单单: cc, pp -> 互斥关系不需要维护,互斥与同步
    // 多多:cc, pp -> 之间的互斥关系!
    pthread_t c[2], p[3];

    threaddata *td = new threaddata();
    td->name = "cthread-1";
    td->rq = rq;
    pthread_create(c, nullptr, consumer, td);

    threaddata *td2 = new threaddata();
    td2->name = "cthread-2";
    td2->rq = rq;
    pthread_create(c + 1, nullptr, consumer, td2);

    threaddata *td3 = new threaddata();
    td3->name = "pthread-3";
    td3->rq = rq;
    pthread_create(p, nullptr, productor, td3);

    threaddata *td4 = new threaddata();
    td4->name = "pthread-4";
    td4->rq = rq;
    pthread_create(p + 1, nullptr, productor, td4);

    threaddata *td5 = new threaddata();
    td5->name = "pthread-5";
    td5->rq = rq;
    pthread_create(p + 2, nullptr, productor, td5);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);

    return 0;
}

测试结果:

多生产多消费打印出来的现象的参考价值不大,因为生产消费的时候,因为打印是处理的,不属于生产或消费,打印出来错乱是很正常的,等后面我们将日志带进来就好了。

不过至少证明:我们的多生产多消费模型是可以跑的。

信号量另一个本质就是把临界资源是否存在?就绪?等的条件,以原子性的形式呈现在访问临界资源之前就判断了(经过对比上一篇的阻塞队列,上一篇的Equeue是有判断的)。

所以,我们通过环形队列就可以实现多资源分配了,当N=1时,也就是转换成二元信号量,和阻塞队列一致的效果,都是访问一个整体资源了。

所以:

信号量和互斥锁在管理不同类型资源时各有特点:

  • 互斥锁:主要用于保护单个共享资源,确保同一时刻只有一个线程可以访问该资源。它是一种特殊的二进制信号量,其计数器只有0和1两个值。互斥锁的操作通常包括加锁和解锁,且必须由同一线程完成。它适用于需要严格互斥访问的场景,如保护临界区或共享数据结构,以防止数据竞争和不一致问题。

  • 信号量:不仅可以用于互斥,还可以用于控制对多个同类资源的访问,允许多个线程同时访问资源,但数量受限于信号量的计数器值。信号量的计数器可以是任意非负整数,表示可用资源的数量。它适用于需要控制多个资源并发访问的场景,如资源池管理或线程间的同步。

总的来说,互斥锁更适合管理单个资源的互斥访问,而信号量则更适合管理多个资源的并发访问和线程同步。


阻塞队列和环形队列,都是"队列",为什么一个被当作整体资源整体看待,一个被看成整体资源分块看待? 

从底层实现的角度来看,两个代码分别使用了 std::queuestd::vector,导致资源管理方式不同,原因如下:

阻塞队列(BlockQueue)使用 std::queue

底层实现std::queue 是一个容器适配器,默认底层使用 std::deque 实现。std::deque 是一个双端队列,内部由多个小块连续空间组成,整体逻辑上连续。

整体资源看待的原因

  • 操作限制std::queue 只提供了队列的基本操作(如 pushpop 等),无法直接访问队列中的元素,只能通过队列的整体状态(如是否为空、是否已满)来控制操作。因此,在阻塞队列中,需要将整个队列作为一个整体资源进行保护,以确保线程安全。

  • 线程同步:阻塞队列通过条件变量(_full_cond_empty_cond)来协调生产者和消费者。当队列满或空时,需要对整个队列的状态进行判断和同步,而不是单独操作队列中的某个元素。

环形队列(RingQueue)使用 std::vector

底层实现std::vector 是一个动态数组,内部使用连续的内存空间存储元素。它提供了随机访问的能力,并且可以在尾部高效地插入和删除元素。

整体资源分块看待的原因

  • 随机访问与分块管理std::vector 的连续内存空间使得可以通过索引直接访问任意元素。环形队列利用这一点,将队列空间分为“空闲空间”和“已占用空间”两部分,分别用信号量 _blank_sem_data_sem 表示。生产者和消费者分别关注空闲空间和已占用空间,而不是整个队列。

  • 独立控制:信号量机制允许生产者和消费者独立操作。生产者在有空闲空间时(通过 _blank_sem 判断)进行生产,消费者在有数据时(通过 _data_sem 判断)进行消费。这种独立性使得资源管理更加灵活,避免了对整个队列加锁的开销。