【Linux】POSIX信号量与基于环形队列的生产消费者模型

发布于:2025-03-27 ⋅ 阅读:(35) ⋅ 点赞:(0)

目录

一、POSIX信号量:

接口:

二、基于环形队列的生产消费者模型

环形队列:

单生产单消费实现代码:

RingQueue.hpp:

main.cc:

多生产多消费实现代码:

RingQueue.hpp:

main.cc:


一、POSIX信号量:

在实现线程的同步,互斥不仅仅只有条件变量和锁,还有POSIX信号量,这里学习的POSIX信号量和之前学习的SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX可以用于线程间同步

引入信号量:

对于共享资源,为了保证其并发性,将其分成了几份资源,就允许几个线程进入共享资源访问,此时就引入了信号量来对其进行保护

信号量的本质是一个计数器

这把计数器用来描述临界资源中资源数目的多少,实际上是对资源的预加载机制(这就像在电影院买票,买票的本质就是对电影院座位的预加载机制,当你买到票了,就一定会有位置给你,并且别人即使有票也是坐别人的座位,也不会抢了你的座位)

虽然信号量的本质是一个计数器,当一个线程申请资源成功就将计数器--,当一个线程申请资源失败就将计数器++,但是不能用一个简简单单的普通变量代替信号量,因为变量的--和++操作不是原子的,所以,我们就要使用一个支持PV操作的原子的计数器------信号量

那么什么是PV操作呢?

P:代表申请资源,计数器--

V:代表释放资源,计数器++

当将共享资源分为N份,此时信号量也就是N,这个时候就能够申请资源,再将信号量--,当信号量为0的时候,线程就不能够申请资源了,只能阻塞等待

如上,这是一个多元信号量sem,我们之前学习的锁被叫做二元信号量

在使用多元信号量访问资源的时候,要先申请信号量,只有申请成功了,才能访问资源否则就需要进入阻塞队列等待

接口:

初始化信号量

参数1:需要初始化信号量的地址

参数2:表示的是线程共享还是进程共享,默认为零,线程共享,非零表示进程共享

参数3:设定的信号量的初始值

返回值:初始化成功返回0,失败返回-1,并设置错误码

其中sem_t实际上是一个联合体

销毁信号量

参数:就是需要销毁信号量的地址

返回值:初始化成功返回0,失败返回-1,并设置错误码

申请信号量:

其中下面用的是sem_wait,其功能就是成功将信号量-1,也就是P操作

参数:就是需要销毁信号量的地址

返回值:初始化成功返回0,失败返回-1,并设置错误码

sem_trywait:尝试申请,如果没有申请到资源,就会放弃申请

sem_timedwait:每隔一段时间进行申请

释放信号量(发布信号量)

参数:就是需要销毁信号量的地址

返回值:初始化成功返回0,失败返回-1,并设置错误码

其表示资源使用完毕,归还资源,成功将信号量+1,也就是V操作

二、基于环形队列的生产消费者模型

环形队列:

在实现生产消费者的模型中,不仅仅只有共享队列,还有环形队列,什么是环形队列呢?

虽然它叫环形队列,但是它不是队列,而是用数组实现的,
其中head作为头指针,当申请资源成功的时候就向后移动一位,
tail作为尾指针,当释放资源成功的时候向后移动一位,

首先,如何让数组成环呢?

在每次head++后都进行一次取模操作,这样保证head的大小不会超过这个环形队列的大小

特殊的是,当为空或者为满的时候,头指针和尾指针都指向同一个位置,那么如何证明此时是空还是满呢?

这里有两种方法:

方法一:添加一个计数器,当计数器的值为0的时候,表示当前为空,当计数器的值为容器大小的时候,表示该环形队列为满

判空条件:count == 0
判满条件:count == size

方法二:牺牲一个空间的大小,通过预留一个空位,避免head和tail重合时无法区分空和满。此时队列最大容量为size-1
判空条件:head== tail
判满条件:(head+ 1) % size == tail

在下面实现的时候采用计数器,毕竟信号量是一个天然的计数器

当数据不为空或者满的时候,此时head指针和tail指针必定不指向同一个位置,

此时就能够进行生产者和消费者的同时访问,

为空的时候,只能生产者访问,生产者只关注还剩多少空间

为满的时候,只能消费者访问,消费者只关注还剩多少数据

所以在使用信号量标识资源的情况下,生产者和消费者关注的资源不一样,所以就需要两个信号量来进行计数:

生产者的信号量:表示当前有多少可用空间

消费者的信号量:表示当前有多少可消费数据

所以以下在实现的时候,定义两个信号量,spacesem = N 和datasem = 0

对于生产者的PV操作:P(spacesem)将空间资源-1,V(datasem)将数据资源+1

对于消费者的PV操作:P(datasem)将数据资源-1,V(spacesem)将空间资源+1

单生产单消费实现代码:

RingQueue.hpp:

首先创建一个实现环形队列的文件:

#pragma once
#include <vector>
#include <iostream>
#include <semaphore.h>

template <class T>
class RingQueue
{
private:
    std::vector<T> _ringqueue; // 用vector模拟环形队列
    int _maxcap;               // 环形队列的最大容量

    int _p_step; // 生产者下标
    int _c_step; // 消费者下标

    sem_t _pspace_sem; // 生产者关注的空间资源
    sem_t _cdata_sem;  // 消费者关注的数据资源
};

接着依次实现其中的接口:

构造与析构

    RingQueue(int maxcap = 5)
        : _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0)
    {
        sem_init(&_pspace_sem,0,maxcap);
        sem_init(&_cdata_sem,0,0);

        pthread_mutex_init(&_p_mutex,nullptr);
        pthread_mutex_init(&_c_mutex,nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&_pspace_sem);
        sem_destroy(&_cdata_sem);

        pthread_mutex_destroy(&_p_mutex);
        pthread_mutex_destroy(&_c_mutex);
    }

其中,构造函数的主要作用就是初始化各种变量,析构函数的主要作用就是释放这些变量

push与pop

push的作用是从交易场所中放入数据,pop的作用是从交易场所中拿到数据

    void push(const T& in)
    {
        //生产数据先要申请信号量来预定资源
        P(_pspace_sem);

        _ringqueue[_p_step] = in;//将所对应的数据放入到环形队列中
        _p_step++;//将生产者对应的下标向后移动一位
        _p_step %= _maxcap;//保证生产者不会超过环形队列的大小

        V(_cdata_sem);
    }

    void pop(T *out)
    {
        P(_cdata_sem);

        pthread_mutex_lock(&_c_mutex);
        *out = _ringqueue[_c_step];//将该位置的数据交给out作为输出型参数带出去
        _c_step++;//将消费者对应的下标向后移动一位
        _c_step %= _maxcap;//保证消费者下标不会超过环形队列的大小
        pthread_mutex_unlock(&_c_mutex);

        V(_pspace_sem);
    }

生产者push后,证明环形队列中一定有数据,所以就需要在V后传入消费者关心的信号量,也就是需要传递_cdata_sem

消费者pop后,证明环形队列中一定有空间,所以就需要在V后传入生产者关心的信号量,也就是需要传递_pspace_sem

PV操作:

    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem)
    {
        sem_post(&sem);
    }

P操作代表申请资源,也就是semwait这个函数

V操作就是释放了资源,比如生产者就是释放了一个数据

这里要保证数据在为空的时候只能生产者运行,在数据为满的时候只能消费者去运行,

所以wait是为了保持顺序同步,保证即使消费者先调用,但是没有数据,就将消费者申请资源所关注的数据信号量送去等待队列里去等待

在封装V操作中,post就是释放资源,对于生产者就是给了个数据给消费者

对于消费者post就是释放了空间,生产者就能接着生产了

那消费者一开始调用P操作,没有数据就会阻塞,而生产者这边V了数据,消费者这边P就不会阻塞了可以拿到数据了

所以生产和消费这两者的PV操作是反的

生产者V了,消费者的p就停止阻塞了因为生产者给了消费者资源了

反之同理

main.cc:

void *Productor(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);

    while (true)
    {
        int data = rand()%10+1;
        rq->push(data);
        
        std::cout<<"Productor : data = "<< data << std::endl;

        sleep(1);
    }
    
    return nullptr;
}

void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);

    while (true)
    {
        int data = 0;
        rq->pop(&data);
        std::cout<<"Consumer : data = "<< data << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr)^getpid());
    RingQueue<Task> *rq = new RingQueue<Task>();

    pthread_t c, p;

    pthread_create(&p, nullptr, Productor, rq);
    pthread_create(&c, nullptr, Consumer, rq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    
    delete rq;
    return 0;
}

或者也可以让消费者疯狂消费数据,生产者疯狂生产

多生产多消费实现代码:

RingQueue.hpp:

在多生产多消费中,需要保证生产者和生产者之间、消费者和消费者之间的互斥关系,生产者和消费者之间的互斥关系已经由信号量承担了

所以在多生产多消费的代码中要加上锁

构造与析构中也要增加初始化锁与释放锁

template <class T>
class RingQueue
{
public:
    RingQueue(int maxcap = 5)
        : _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0)
    {
        sem_init(&_pspace_sem,0,maxcap);
        sem_init(&_cdata_sem,0,0);

        pthread_mutex_init(&_p_mutex,nullptr);
        pthread_mutex_init(&_c_mutex,nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&_pspace_sem);
        sem_destroy(&_cdata_sem);

        pthread_mutex_destroy(&_p_mutex);
        pthread_mutex_destroy(&_c_mutex);

    }

private:
    std::vector<T> _ringqueue; // 用vector模拟环形队列
    int _maxcap;               // 环形队列的最大容量

    int _p_step; // 生产者下标
    int _c_step; // 消费者下标

    sem_t _pspace_sem; // 生产者关注的空间资源
    sem_t _cdata_sem;  // 消费者关注的数据资源

    pthread_mutex_t _p_mutex;//保证生产者和生产者之间的互斥
    pthread_mutex_t _c_mutex;//保证消费者和消费者之间的互斥
};

push与pop

    void push(const T& in)
    {
        //生产数据先要申请信号量来预定资源
        P(_pspace_sem);//信号量的申请本来就是原子的,所以加锁的时候就需要在这之后

        pthread_mutex_lock(&_p_mutex);
        
        _ringqueue[_p_step] = in;//将所对应的数据放入到环形队列中
        _p_step++;//将生产者对应的下标向后移动一位
        _p_step %= _maxcap;//保证生产者下标不会超过环形队列的大小

        pthread_mutex_unlock(&_p_mutex);

        V(_cdata_sem);
    }

    void pop(T *out)
    {
        P(_cdata_sem);

        pthread_mutex_lock(&_c_mutex);
        *out = _ringqueue[_c_step];//将该位置的数据交给out作为输出型参数带出去
        _c_step++;//将消费者对应的下标向后移动一位
        _c_step %= _maxcap;//保证消费者下标不会超过环形队列的大小
        pthread_mutex_unlock(&_c_mutex);

        V(_pspace_sem);
    }

细节:

在加锁的时候要在申请信号量之后,这样能够提高并发度

如果是在申请信号量之前进行加锁,那么申请信号量的线程永远只有一个  不能够提高并发度

理解:

就像在电影院中,是先买票在进行排队的,这样能够加快进场的速度,如果排队后再买票,需要一人一人地进行操作,这相比上一种就会很慢的

申请信号量的操作是原子的,不需要加锁保护也能保证线程安全,所以并发申请信号量,串行访问临界资源能够提高并发度

main.cc:

在进行生产消费者模型中的数据问题,不仅仅是让二者看到同一份资源,更重要的是让消费者拿到资源并对资源进行处理,这里引入上一章的Task文件来进行数据处理

Task.hpp

#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    Divzero = 1,
    Modzero,
    Unknow
};

class Task
{
public:
    Task()
    {}
    Task(int data1, int data2, char oper)
        : _data1(data1), _data2(data2), _oper(oper),_exitcode(0)
    {}

    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
            if (_data2 == 0)
                _exitcode = Divzero;
            else
                _result = _data1 / _data2;
            break;
        case '%':
            if (_data2 == 0)
                _exitcode = Modzero;
            else
                _result = _data1 % _data2;
            break;
        default:
            _exitcode = Unknow;
            break;
        }
    }

    void operator()()
    {
        run();
    }

    std::string Getresult()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=";
        ret += std::to_string(_result);
        ret += "[exitcode=";
        ret += std::to_string(_exitcode);
        ret += "]";
        return ret;
    }

    std::string GetTask()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=?";
        return ret;
    }
    
    ~Task()
    {}

private:
    int _data1;
    int _data2;
    char _oper;

    int _exitcode;
    int _result;
};

接着在生产消费者的线程所执行的对应的方法中,基本和上一章类似

void *Productor(void *args)
{
    ThreadData *td = static_cast<ThreadData *>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand()%len];
        Task t(data1,data2,op);
        rq->push(t);
        std::cout<<"Productor : Task = "<< t.GetTask() << " who "<< name << std::endl;
        sleep(1);
    }
    
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData *>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    while (true)
    {
        Task t;
        rq->pop(&t);
        //处理数据
        t();
        std::cout << "Consumer : Task = " << t.GetTask() << " who: " << name << " result: " << t.Getresult() << std::endl;
        // sleep(1);
    }
    return nullptr;
}

我们也可以创建一个结构体来存储线程名称与任务

struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};
int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq = new RingQueue<Task>();
    pthread_t c[5], p[3];
    for(int i = 0;i<3;i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p+i, nullptr, Productor, td);
        usleep(10);

    }
    sleep(1);
    for(int i = 0;i<5;i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c+i, nullptr, Consumer, td);
        usleep(10);
    }
    for(int i = 0;i<3;i++)
    {
        pthread_join(p[i], nullptr);
    }
    for(int i = 0;i<5;i++)
    {
        pthread_join(c[i], nullptr);
    }
    return 0;
}

注意:在环形队列中允许多个生产者线程一起进行生活数据,也允许多个消费者线程一起消费数据,多个线程一起操作并非同时操作,任务开始时间有先后,但都是在进行处理的