目录
概念补充
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件.
条件变量
简单来说,条件变量相当于一个阻塞队列,将线程入队列(等待),满足条件就出队列(唤醒)
操作
全局条件变量
和mutex相似,使用宏初始化一个全局的条件变量
局部条件变量
attr传nullptr即可
等待
cond为条件变量,mutex为锁
唤醒
signal为唤醒一个线程
broadcast为唤醒所有线程
例:多线程抢票
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
int tickets = 100;
void* func(void* args)
{
std::string name(static_cast<char*>(args));
while(true)
{
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond,&lock);
if(tickets==0) {pthread_mutex_unlock(&lock);break;}
std::cout<<name<<" "<<--tickets<<std::endl;
pthread_mutex_unlock(&lock);
}
return (void*)0;
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,func,(void*)"t1");
pthread_create(&t2,nullptr,func,(void*)"t2");
pthread_create(&t3,nullptr,func,(void*)"t3");
while(true)
{
pthread_cond_signal(&cond);
sleep(0.3);
if(tickets==0)
{
pthread_cond_broadcast(&cond);
break;
}
}
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
每个线程执行到wait时会阻塞住,
为什么wait要传锁,
因为wait会让线程等待,如果线程申请到了锁等待,那么其他线程就无法申请到锁,所以wait会先释放锁再等待,等被唤醒后先申请锁再向下执行,保证互斥.
封装
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"
class Cond
{
pthread_cond_t _cond;
public:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
void Wait(LockGuard& lock)
{
int n = pthread_cond_wait(&_cond,lock.Get());
(void)n;
}
void Notifyone()
{
int n = pthread_cond_signal(&_cond);
(void)n;
}
void Notifyall()
{
int n = pthread_cond_broadcast(&_cond);
(void)n;
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
};
生产者消费者模型
和生活中的超市类似,就是让不同线程作为生产产品者和消费产品者,临界区作为厂家和消费者的中转站超市.
为什么存在这样的模型:
1.减少生产和消费过程中产生的成本
2.支持生产和消费的忙闲不均
3.维护松耦合关系
减少成本问题: 比如一个方便面厂家,生产方便面都是成批生产,而消费者一般只会按个按箱购买,如果两者直接交易,厂家生产一袋方便面,消费者购买,这样厂家的成本会很高;如果厂家生产一批,消费者也买不了一批产品,消费者成本会很高.
支持忙闲不均和松耦合关系问题: 还是上面的例子,厂家可以生产一批放到超市,然后等消费者慢慢消费,生产和消费没有直接关系
生产者和消费者之间的关系
生产者之间 : 互斥关系
消费者之间 : 互斥关系
生产者与消费者之间 : 互斥+同步关系
BlockQueue(阻塞队列)
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
//<BlockQueue.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "mutex.hpp"
#include "cond.hpp"//上面封装的条件变量
const static u_int32_t CAP = 5;
template<typename T>
class BlockQueue
{
std::queue<T> _q;
u_int32_t _cap;
pthread_mutex_t _lock;
Cond _c_cond;//消费者
Cond _p_cond;//生产者
unsigned int _c_wait_num;//当前消费者等待的个数
unsigned int _p_wait_num;//当前生产者等待的个数 为了防止唤醒次数过多或唤醒无意义
bool isFull()
{
return _q.size()>=_cap;
}
bool isEmpty()
{
return _q.empty();
}
public:
BlockQueue(u_int32_t cap = CAP) :_cap(cap)
{}
void Enqueue(const T& data)
{
{
LockGuard lock(&_lock);
while(isFull()) //while循环防止wait调用失败和伪唤醒问题
{
_p_wait_num++;
_p_cond.Wait(lock);
_p_wait_num--;
}
_q.push(data);
if(_c_wait_num>0)
_c_cond.Notifyone();
}
}
void Pop(T* retval)
{
{
LockGuard lock(&_lock);
while(isEmpty()) //while循环防止wait调用失败和伪唤醒问题
{
_c_wait_num++;
_c_cond.Wait(lock);
_c_wait_num--;
}
*retval = _q.front();
_q.pop();
if(_p_wait_num>0)
_p_cond.Notifyone();
}
}
~BlockQueue()
{}
};
单生产单消费
#include "blockqueue.hpp"
#include <iostream>
#include <string>
#include <unistd.h>
struct Data
{
BlockQueue<int>* bq;
std::string name;
};
void *consumer(void* args)
{
Data* data = static_cast<Data*>(args);
int retval = 0;
while(true)
{
data->bq->Pop(&retval);
std::cout<<data->name<<"消费了 " <<retval<<std::endl;
//sleep(1);
}
return (void*)0;
}
void *productor(void* args)
{
Data* data = static_cast<Data*>(args);
int num = 1;
while(true)
{
data->bq->Enqueue(num);
std::cout<<data->name<<"生产了 "<<num++<<std::endl;
sleep(1);
}
return (void*)0;
}
int main()
{
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t c,p;
Data ctd = {bq,"消费者"};
pthread_create(&c,nullptr,consumer,(void*)&ctd);
Data ptd = {bq,"生产者"};
pthread_create(&p,nullptr,productor,(void*)&ptd);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
对于多生产和多消费,BlockQueue.hpp也支持
信号量
简介
信号量针对多线程并发访问一块资源中的不同部分.
本质上,就是一个描述资源数量的计数器
操作
sem_init 初始化信号量
pshared 用来表示线程使用还是进程使用,0表示线程间使用
value 信号量的初始值
sem_destroy 销毁信号量
sem_wait 信号量--
sem_post 信号量++
多生产者多消费者RingQueue(环形队列)代码
生产者关注空余空间资源,消费者关注数据资源
sem封装 <sem.hpp>
#pragma once
#include <iostream>
#include <semaphore.h>
#define NUM 5
class Sem
{
sem_t _sem;
int _initnum;
public:
Sem(int num = NUM):_initnum(num)
{
sem_init(&_sem,0,_initnum);
}
void P()
{
int n = sem_wait(&_sem);
(void)n;
}
void V()
{
int n = sem_post(&_sem);
(void)n;
}
~Sem()
{
sem_destroy(&_sem);
}
};
<mutex.hpp>
#pragma once
#include <iostream>
#include <unistd.h>
class LockGuard
{
pthread_mutex_t* _lock;
public:
LockGuard(pthread_mutex_t* lock)
:_lock(lock)
{
pthread_mutex_lock(_lock);
}
pthread_mutex_t* Get()
{
return _lock;
}
~LockGuard()
{
pthread_mutex_unlock(_lock);
}
};
<cond.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"
class Cond
{
pthread_cond_t _cond;
public:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
void Wait(LockGuard& lock)
{
int n = pthread_cond_wait(&_cond,lock.Get());
(void)n;
}
void Notifyone()
{
int n = pthread_cond_signal(&_cond);
(void)n;
}
void Notifyall()
{
int n = pthread_cond_broadcast(&_cond);
(void)n;
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
};
<RingQueue.hpp>
#pragma once
#include <iostream>
#include "sem.hpp"
#include <vector>
#include "mutex.hpp"
#define CAP 10
template<typename T>
class RingQueue
{
std::vector<T> _rq;
int _cap;
Sem _data_sem;
Sem _space_sem;
int _c_step;
int _p_step;
pthread_mutex_t _p_lock;
pthread_mutex_t _c_lock;
public:
RingQueue(int cap = CAP)
:_cap(cap) ,_rq(cap) ,_data_sem(0) ,_space_sem(cap),_c_step(0),_p_step(0)
{
pthread_mutex_init(&_p_lock,nullptr);
pthread_mutex_init(&_c_lock,nullptr);
}
void Enqueue(const T& data)
{
_space_sem.P();
{
LockGuard lock(&_p_lock);
_rq[_c_step++] = data;
_c_step%=_cap;
}
_data_sem.V();
}
void Pop(T* data)
{
_data_sem.P();
{
LockGuard lock(&_c_lock);
*data = _rq[_p_step++];
_p_step%=_cap;
}
_space_sem.V();
}
~RingQueue()
{
pthread_mutex_destroy(&_p_lock);
pthread_mutex_destroy(&_c_lock);
}
};
<RingQueue.cpp>
#include <iostream>
#include "sem.hpp"
#include "mutex.hpp"
#include "cond.hpp"
#include <pthread.h>
#include "RingQueue.hpp"
RingQueue<int> rq(10);
void* consumer(void* args)
{
while(true)
{
int data = 0;
rq.Pop(&data);
std::cout<<"消费 "<<data<<std::endl;
}
return (void*)0;
}
void* productor(void* args)
{
int data = 1;
while(true)
{
rq.Enqueue(data);
std::cout<<"生产 "<<data++<<std::endl;
}
return (void*)0;
}
int main()
{
pthread_t c[2],p[2];
pthread_create(c,nullptr,consumer,nullptr);
pthread_create(c+1,nullptr,consumer,nullptr);
pthread_create(p,nullptr,productor,nullptr);
pthread_create(p+1,nullptr,productor,nullptr);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
return 0;
}
信号量与锁
为什么信号量不需要加锁,信号量为空或为满时,信号量完成了同步和互斥动作,信号量不为空和不为满时,生产者和消费者访问不同位置,也不需要加锁
二元信号量(信号量初始值为1) :就是锁
锁:认为资源只有一份,申请锁相当于信号量P操作(--),释放锁相当于信号量V操作(++),所以锁是信号量的一种特殊情况
小知识
1.pthread开头的函数一般返回值为0表示成功
2.生产和消费还可以传递函数,用来给线程派发任务
3.生产消费模型高效在哪里:生产者生产数据和消费者处理数据是并发的,两者是松耦合关系,可以忙闲不均,这里效率高,而二者向阻塞队列中进行放入和拿出属于同步和互斥,效率不高