前言:
大家好啊,我们上一篇文章已经讲解了关于线程同步的一种办法:运用条件变量cond。
今天,我们就来学习一下线程同步的另外一种方法,信号量!!
信号量呢有System V 信号量与POSIX 信号量,我们这里主要还是讲POSIX信号量,System V主要还是用于进程间通信的比较多一些。
一、生产者消费者模型再理解
我们在上篇文章讲的生产者消费者模型大家还记得吗?
生产者申请锁,开始生产,如果此时为满,就通知消费者的条件变量,让消费者开始动起来。
这整体过程不是串行的吗?那我们说生产者消费者模型的高效性?从何谈起呢?
我们不能单一的去看这个过程,这个效率问题要从整体上来看待。为什么说他高效?关键在“并行化重叠” 。
大家注意,生产者生产数据,那么这个数据从哪里来呢?
是不是也需要有人传给生产者?此时生产者是不需要锁进入临界区的。那么这个时候消费者在干什么呢?消费者是不是就可以持有锁,读取临界区资源啊!!那么消费者读取完了还会干什么?他会把锁解锁,然后对拿到的资源进行处理。此时生产者就可以对临界区进行生产了。
生产者和消费者,不就并行起来了吗?
所以,我们这里有个缓冲队列,就是为了解耦生产者和消费者的速度差异,允许两者并行执行。
二、 什么是信号量Sem
信号量是一种用于协调多线程/多进程对共享资源访问的同步机制,由计算机科学家 Edsger Dijkstra 在1965年提出。它的核心是一个计数器 + 等待队列,通过原子操作控制资源分配,解决并发编程中的竞态条件和死锁问题。
我们该如何理解信号量的概念呢?
在面对需要对临界区资源整体的使用时,我们提出来互斥锁的概念。
如果我们对临界区资源不是整体使用的时候呢?我们还是使用锁吗?
为了方便大家理解,我们可以举一个电影院的例子。
这个电影院的每一个房间的座位是不是固定的啊?
就相当于有固定的这么多份的资源。信号量,就相当于是电影票。
你申请信号量,就是购买电影票,无论你今天是否到了现场去看电影,这个座位都只会是你的,而不能被被人使用。
这就是信号量保护不是整体使用临界区资源的效果。
信号量可以看作是一个计数器,申请信号量本质上是对资源的预定机制。我们有这么多的座位,就可以放这么多数量的票。
三、信号量接口与原子性问题
我们接下来介绍一下信号量的相关接口,system信号量是系统级接口,所以使用十分复杂。
但我们的Sem信号量是用户级的,所以接口的使用就像之前的条件变量,锁一样简单,主要常用的几个接口如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
这是用来初始化一个信号量的函数,就跟我们之前的锁与条件变量一样,也是需要先定义一个sem_t类型的变量,随后调用该函数对他进行初始化。
sem
:信号量变量指针。
pshared
:为0,
线程间共享(同一进程内)。非0
:进程间共享(需放在共享内存中)。
value
:信号量初始值(如资源初始数量,就跟电影票的数量一样)。
int sem_destroy(sem_t *sem);
负责信号量的销毁。
注意:
必须确保信号量未被任何线程/进程等待,且无名信号量(sem_init
创建)必须显式销毁。
int sem_wait(sem_t *sem); // 阻塞等待 int sem_trywait(sem_t *sem); // 非阻塞尝试 int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); // 超时等待
等待一个信号量,我们通常使用第一个。这个等待操作我们后面统称为P操作。
int sem_post(sem_t *sem);
释放一个信号量,让他的值加一,并唤醒一个等待的信号量(如果有)。
int sem_getvalue(sem_t *sem, int *sval);
获取当前信号量的值,就是查看还剩多少票。注意,这个sval是一个输出型参数,结果就是剩余的票。
这里我们要特别说明一下,我们之前说过,申请一个信号量就相当于让 信号量的值减一,释放一个信号量就相当于让一个信号量的值加一,这里我们得特别注意,这里的加一减一的操作不可能是--或者++。
为什么呢?
我们之前说过,++与--并不是原子性的操作,我们信号量是保证临界区资源安全的操作,所以大家都会看到信号量,所以信号量本身跟锁一样,也是一个共享资源,他是共享资源,想要保护别人,谁来保护她??
所以,他自己本身就必须具备原子性,否则连自己的安全都顾不了,怎么顾及别人。
所以他的加一减一是通过 硬件级原子指令(如CAS、TSL)实现的。
四、基于环形队列的消费者生产者模型
我们现在就来实现一下这个环形队列的消费者生产者模型,因为这个很贴近我们刚刚讲的信号量内容,他满足里面的数据时是分隔开可以错开被访问的。
之前讲的队列存在许多弊端,比如高强度的pop与push的内存释放,以及激烈的锁竞争关系。
而我们的环形队列是怎么解决的呢?
我们选择底层使用数组来实现环形(这个方法在C++中应该用过吧,通过控制数组的下标来实现环形结构),这样我们只需要更改数据就行了。
我们保存的最大数据容量是数组的大小,使用了信号量来记录消费者与生产者的位置,所以我们可以知道是否满了,从而防止新增数据时替换我们还没被消费者使用的数据。
总的来说,对于这个模型而言,一共有如下几种情况:
1、访问数据时生产者与消费者访问到了同一个位置。虽然我们说整个临界区可以错开使用,但是如果此时你没有错开,是不是就要串行使用了啊?
没错,此时我们有两种情况:
1、访问到同一个地方是因为数据为空了(我们默认消费者是按照生产者生产的顺序访问的数据)。既然为空,那我们的串行是不是要保证生产者先运行(你此时都没数据消费者怎么消费呢?),那么具体要执行那些动作呢?
是不是要先对生产者的信号量进行P操作(申请信号量,就是申请门票),随后我们进行数据的填充,之后再让我们的下标++,此时有数据了,我们就可以对消费者的信号量进行V操作,唤醒等待的消费者。
2、访问到同一个地方,是因为数据满了。既然满了,那我们是不是先让消费者消费数据?所以我们就对消费者的信号量值进行P操作(减一,注意,生产者的信号量值表示剩余的空间,消费者的信号量值表示剩余没使用的数据),之后对数据进行提取操作,随后让下标移动,最后唤醒我们等待的消费者。
2、 访问数据时,生产者与消费者没有访问到了同一个位置。这个时候就简单了,此时他们没有访问同一个资源,自然就可以并发的执行了。
大家可以这样理解以上过程:
我们有一个大小为20的环状箱子。
生产者负责先放苹果,消费者负责拿苹果。=,二者都是放(拿)一个苹果,才能移动到下一格。
如果此时里面一个苹果都没有,消费者是不是就走不动路,所以此时只能生成者放完苹果消费者才能动。
如果全是苹果,生产者就走不动了,只能等自己下一格的(环状)消费者拿起来一个苹果后才能移动。
相当于一个追逐游戏,一个放一个拿
我们永远无法给对方套圈(跑步时超了别人一圈叫做套圈),如果我们访问同一个位置,就串行,如果不是,就可以并发运行。
我们基于代码实现一下上面的环形队列,在此之前,我们可以先封装一下我们的信号量接口,就跟封装条件变量一样:
#pragma once
#include <semaphore.h>
namespace SemMudule
{
const int defalutsemval = 1;
class sem
{
public:
sem(int semval = defalutsemval):_init_value(semval)
{
::sem_init(&_sem,0,_init_value);
}
void P()
{
::sem_wait(&_sem);
}
void V()
{
::sem_post(&_sem);
}
~sem()
{
::sem_destroy(&_sem);
}
private:
sem_t _sem;
int _init_value;
};
}
所以这里我就不多说了。
而我们的环形队列需要哪些变量呢?
首先,我们的底层是数组,所以需要一个数组,还需要一个整数来记录数组大小。
另外我们的生产者与消费者的下标位置,也各自需要一个整形来记录,还有生产者与消费者各自的信号量,所以代码可以先写成这样:
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
namespace RingBufferModule
{
using namespace SemMudule;
template<typename T>
class ringbuffer
{
public:
ringbuffer()
{
}
~ringbuffer()
{
}
private:
std::vector<T> _buffer;//环形缓冲区
size_t _size;//缓冲区大小
sem _psem; //生产者信号量
sem _csem; //消费者信号量
size_t _p_step; //生产者下标
size_t _c_step; //消费者下标
}
}
那我们接下来要做的就是完善我们的构造函数,并实现pop与equeue接口:
在构造函数中,我们需要对各个变量进行初始化:
ringbuffer(int cap)//cap为外界传进来的环形队列的大小
:_size(cap),
_buffer(cap),
_psem(cap),
_csem(0),
_p_step(0),
_c_step(0)
{
}
然后我们要实现我们的插入与取出数据,在前面我们已经说过细节,就是在插入新数据时,是生产者的动作,所以生产者对自己的信号量进行申请,使其减一(代表剩余空间减一),随后进行替换数据操作,最后移动下标并唤醒阻塞的消费者(如果有阻塞的消费者)。
取出也是同理:
void Equeue(const T&in)//采取引用传参,减少拷贝
{
_psem.P();
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
void Pop(T*out)
{
_csem.P();
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
_psem.V();
}
这里为什么不需要用if,while等条件判断了呢?
因为信号量的值是数量,剩余空间为0和剩余数据为0会分别自动阻塞生产者与消费者。所以相当于自带一个判断。本身就是表示资源数目的,只要成功,就一定有,不需要判断!
所以总的代码:
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
namespace RingBufferModule
{
using namespace SemMudule;
template<typename T>
class ringbuffer
{
public:
ringbuffer(int cap)//cap为外界传进来的环形队列的大小
:_size(cap),
_buffer(cap),
_psem(cap),
_csem(0),
_p_step(0),
_c_step(0)
{
}
void Equeue(const T&in)//采取引用传参,减少拷贝
{
_psem.P();
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
void Pop(T*out)
{
_csem.P();
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
_psem.V();
}
~ringbuffer()
{
}
private:
std::vector<T> _buffer;//环形缓冲区
size_t _size;//缓冲区大小
sem _psem; //生产者信号量,代表的是剩余空间
sem _csem; //消费者信号量,代表的是剩余数据
size_t _p_step; //生产者下标
size_t _c_step; //消费者下标
}
}
我们可以写个测试用例:
#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace RingBufferModule;
void *Consumer(void *args)
{
ringbuffer<int> *ring_buffer = static_cast<ringbuffer<int> *>(args);
while (true)
{
sleep(1);
// sleep(1);
// 1. 消费数据
int data;
ring_buffer->Pop(&data);
// 2. 处理:花时间
std::cout << "消费了一个数据: " << data << std::endl;
}
}
void *Productor(void *args)
{
ringbuffer<int> *ring_buffer = static_cast<ringbuffer<int> *>(args);
int data = 0;
while (true)
{
// 1. 获取数据:花时间
// sleep(1);
// 2. 生产数据
ring_buffer->Equeue(data);
std::cout << "生产了一个数据: " << data << std::endl;
data++;
}
}
int main()
{
ringbuffer<int> *ring_buffer = new ringbuffer<int>(5); // 共享资源 -> 临界资源
// 单生产,单消费
pthread_t c1, p1;
pthread_create(&c1, nullptr, Consumer, ring_buffer);
pthread_create(&p1, nullptr, Productor, ring_buffer);
pthread_join(c1, nullptr);
delete ring_buffer;
return 0;
}
可以看见,生产者与消费者是按照顺序进行生产与历史数据的取出的,当他们不访问同一个空间,就会并发运行。
可是,这只是单消费者单生产者,如果有多个生产者与消费者呢?
其实这些问题,都可以转化为单个消费者与单个生产者的关系:我们只需要让多个生产者先竞争谁来生产,让消费者竞争出一个消费者来消费不就又构成单个消费者与单个生产者的关系了吗?
由于这些生产者想要访问同一个信号量,消费者想访问同一个消费者信号量,这就变成了对同一个资源的互斥问题,所以我们需要用锁来保护我们的信号量:申请成功的线程进入下一个环节,没申请成功的就进行等待。
所以面对多个消费者生产者,我们可以先定义两个锁,随后更改我们代码:
void Equeue(const T&in)//采取引用传参,减少拷贝
{
LockGuard lockguard(_p_lock);//_p_lock是我们对生产者的锁
_psem.P();
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
我们的上锁是在信号量P操作的前面还是后面呢?
实际上都可以,但我们想一下怎么做才能操作高。
我们之前说申请信号量就像是买票,而申请锁就是排队进入。
请问是你先一个一个排好队了,等轮到你了才进行买票效率高?还是先买好票了,再进行排队,轮到你了直接进去效率高?
答案肯定是后者:
所以应该为:
void Equeue(const T&in)//采取引用传参,减少拷贝
{
_psem.P();
LockGuard lockguard(_p_lock);
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
void Pop(T*out)
{
_csem.P();
LockGuard lockguard(_c_lock);
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
_psem.V();
}
甚至于,为了显式控制我们的lockguard的生命周期,我们可以加个大括号包裹起来:
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
namespace RingBufferModule
{
using namespace SemMudule;
using namespace MutexModule;
template <typename T>
class ringbuffer
{
public:
ringbuffer(int cap) // cap为外界传进来的环形队列的大小
: _size(cap),
_buffer(cap),
_psem(cap),
_csem(0),
_p_step(0),
_c_step(0)
{
}
void Equeue(const T &in) // 采取引用传参,减少拷贝
{
_psem.P();
{
LockGuard lockguard(_p_lock);
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
}
_csem.V();
}
void Pop(T *out)
{
_csem.P();
{
LockGuard lockguard(_c_lock);
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
}
_psem.V();
}
~ringbuffer()
{
}
private:
std::vector<T> _buffer; // 环形缓冲区
size_t _size; // 缓冲区大小
sem _psem; // 生产者信号量,代表的是剩余空间
sem _csem; // 消费者信号量,代表的是剩余数据
size_t _p_step; // 生产者下标
size_t _c_step; // 消费者下标
Mutex _p_lock;
Mutex _c_lock;
};
}
总结:
这就是我们今天关于信号量sem的内容了,希望大家能够有所收获!