目录
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。本质就是一个计数器,是对特定资源的预定机制,本身相比于基本类型,POSIX信号是原子的,POSIX可以用于线程间同步,信号量把对临界资源是否存在,就绪等的条件,以原子性的形式呈现在临界资源前就判断了。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
//申请成功执行后面的代码,申请失败阻塞挂起
发布信号量
功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者
标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
设计思路
- 约定一:空,生产者先行
- 约定二:满,消费者先行
- 约定三:生产者不能快过消费者,不能套一个圈以上,防止生产满了还生产
- 约定四:消费者不能超过生产者,防止没有生产好就消费
总结:只有不是同一个位置就能并发进行,用数组来模拟环形队列,开始没有东西,生产者先生产,消费者等待,等生产者生产了数据,唤醒消费者生产,此时开始并发,假如生产者快过消费者,只有判断下一个生产节点是否和消费者位置一致就进行等待,等消费者消费完唤醒,假如消费者快过生产者,只有消费者下一个节点是生产者处的位置时,此时没有数据就进入等待队列,等待唤醒,数据为空为满都是互斥
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
代码实现
单生产,单消费
Sem.hpp
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
const int defaultvalue = 5;
namespace SemModule
{
class Sem
{
public:
Sem(unsigned int sem_value = defaultvalue)
{
sem_init(&_sem, 0, sem_value);
}
~Sem()
{
sem_destroy(&_sem);
}
void V()
{
int n=sem_post(&_sem);
}
void P()
{
int n=sem_wait(&_sem);
}
private:
sem_t _sem;
};
}
Main.cc
#include<iostream>
#include <iostream>
#include <pthread.h>
#include<unistd.h>
#include"RingQueue.hpp"
//消费
void *consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
while(true)
{
sleep(2);
int t =0;
rq->Pop(&t);
std::cout<<"消费拿到一个数据:"<<t<<std::endl;
}
}
//生产
void *productor(void *args)
{
int data=1;
RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
while(true)
{
std::cout<<"生产了一个任务"<<std::endl;
rq->Equeue(data++);
}
}
int main()
{
// 申请阻塞队列
RingQueue<int> *rq = new RingQueue<int>();
// 创建消费则模型
pthread_t c[1], p[1];
pthread_create(c, nullptr, consumer, rq);
pthread_create(p, nullptr, productor, rq);
pthread_join(c[0],nullptr);
pthread_join(p[1],nullptr);
return 0;
}
RingQueue.hpp
#include <vector>
#include <iostream>
#include "Sem.hpp"
using namespace SemModule;
static const int gcap = 5;
template <typename T>
class RingQueue
{
public:
RingQueue(int cap = gcap)
: _rq(cap), _cap(cap), _blank_sem(cap), _p_step(0), _data_sem(0), _c_step(0)
{
}
~RingQueue() {}
// 生产者调用
void Equeue(const T &in)
{
// 1.申请信号量,空位置信号量
_blank_sem.P();
//2.生产
_rq[_p_step]=in;
//3.更新下标,并维持环形特性
_p_step++;
_p_step%=_cap;
//4.通知
_data_sem.V();
}
// 消费者调用
void Pop(T *out)
{
//1.申请信号量,数据信号量
_data_sem.P();
//2.消费
*out=_rq[_c_step];
//3.更新下标,并维持环形特性
_c_step++;
_c_step%=_cap;
_blank_sem.V();
}
private:
std::vector<T> _rq; // 环形队列
int _cap; // 容量
Sem _blank_sem; // 生产者 ->空位置
int _p_step; // 生产者下标位置
Sem _data_sem; // 消费者 ->空数据
int _c_step; //消费者下标位置
};
多生产,多消费
Main.cc
#include<iostream>
#include <iostream>
#include <pthread.h>
#include<unistd.h>
#include"RingQueue.hpp"
//消费
struct threaddata
{
RingQueue<int>*rq;
std::string name;
};
void *consumer(void *args)
{
threaddata *td=static_cast<threaddata*>(args);
while(true)
{
sleep(2);
int t =0;
td->rq->Pop(&t);
std::cout<<td->name<<"消费拿到一个数据:"<<t<<std::endl;
}
}
//生产
void *productor(void *args)
{
threaddata *td=static_cast<threaddata*>(args);
int data=1;
while(true)
{ sleep(1);
std::cout<<td->name<<"生产了一个任务"<<data<<std::endl;
td->rq->Equeue(data++);
}
}
int main()
{ RingQueue<int> *rq = new RingQueue<int>();
// 创建消费则模型
pthread_t c[2], p[3];
threaddata *td1=new threaddata();
td1->name="cthread-1";
td1->rq=rq;
pthread_create(c, nullptr, consumer, td1);
threaddata *td2=new threaddata();
td2->name="cthread-2";
td2->rq=rq;
pthread_create(c+1, nullptr, consumer, td2);
threaddata *td3=new threaddata();
td3->name="pthread-3";
td3->rq=rq;
pthread_create(p, nullptr, productor, td3);
threaddata *td4=new threaddata();
td4->name="pthread-4";
td4->rq=rq;
pthread_create(p+1, nullptr, productor, td4);
threaddata *td5=new threaddata();
td5->name="pthread-5";
td5->rq=rq;
pthread_create(p+2, nullptr, productor, td5);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(p[2],nullptr);
return 0;
}
RInfQueue.hpp
#include <vector>
#include <iostream>
#include "Sem.hpp"
#include "Mutex.hpp"
using namespace SemModule;
using namespace MutexModule;
static const int gcap = 5;
template <typename T>
class RingQueue
{
public:
RingQueue(int cap = gcap)
: _rq(cap), _cap(cap), _blank_sem(cap), _p_step(0), _data_sem(0), _c_step(0)
{
}
~RingQueue() {}
// 生产者调用
void Equeue(const T &in)
{
// 1.申请信号量,空位置信号量
_blank_sem.P(); // 先申请信号量在申请锁效率高一点,先把信号量顺序依次分好,然后分到的再去申请锁,在申请锁的同时别的线程也可以去申请信号量,如果先
// 申请锁在申请信号量就慢了
{
LockGuard lockguard(_pmutex);
// 2.生产
_rq[_p_step] = in;
// 3.更新下标,并维持环形特性
_p_step++;
_p_step %= _cap;
}
// 4.通知
_data_sem.V();
}
// 消费者调用
void Pop(T *out)
{
// 1.申请信号量,数据信号量
_data_sem.P();
{
LockGuard lockguard(_cmutex);
// 2.消费
*out = _rq[_c_step];
// 3.更新下标,并维持环形特性
_c_step++;
_c_step %= _cap;
}
_blank_sem.V();
}
private:
std::vector<T> _rq; // 环形队列
int _cap; // 容量
Sem _blank_sem; // 生产者 ->空位置
int _p_step; // 生产者下标位置
Sem _data_sem; // 消费者 ->空数据
int _c_step; // 消费者下标位置
// 维护多生产,多消费
Mutex _cmutex;
Mutex _pmutex;
};
Mutex.cc
#include <pthread.h>
#include <iostream>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
(void)n;
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
(void)n;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
pthread_mutex_t *get()
{
return &_mutex;
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex &mutex):_mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
Sem.cpp
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
const int defaultvalue = 5;
namespace SemModule
{
class Sem
{
public:
Sem(unsigned int sem_value = defaultvalue)
{
sem_init(&_sem, 0, sem_value);
}
~Sem()
{
sem_destroy(&_sem);
}
void V()
{
int n=sem_post(&_sem);
}
void P()
{
int n=sem_wait(&_sem);
}
private:
sem_t _sem;
};
}