【Linux】线程的互斥和同步
线程间的互斥
- 临界资源:多线程执行共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源进行保护
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
1. 互斥量mutex
- 大部分情况下,线程使用的数据都是局部变量,变量的地址空间在线程的栈空间内,这种情况下,变量属于单个线程,且其他线程无法访问该变量
- 但有时候,很多变量需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程间的交互
- 多个线程并发操作共享变量,会带来一些问题
实验:操作共享变量会有问题的售票系统代码
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<iostream>
#include<pthread.h>
#include <string>
#include <unistd.h>
#include<functional>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T)>;
template<typename T>
class Thread
{
public:
Thread(func_t<T> func,T const data,const std::string& threadname = "none_name")//--常量引用和非常量引用的概念
:_func(func),_data(data),_threadname(threadname),_stop(true)
{}
void Excute()
{
_func(_data);
}
static void* handler(void* args)
{
Thread<T>* self = static_cast<Thread<T> *>(args);
// self->Excute();
self->_func(self->_data);
return nullptr;
}
bool Start()
{
int ret = pthread_create(&_tid,nullptr,handler,this);
if(ret == 0)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
std::string name()
{
return _threadname;
}
~Thread(){}
private:
std::string _threadname;
pthread_t _tid;
T _data;
func_t<T> _func;
bool _stop;
};
}
#endif
using namespace ThreadModule;
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
class ThreadData
{
public:
ThreadData(const std::string& threadname,int& ticked)
:_threadname(threadname),_tickeds(ticked),_total(0)
{}
~ThreadData()
{}
public:
std::string _threadname;
int& _tickeds;
int _total;
};
void route(ThreadData* ptr)
{
while(true)
{
if(ptr->_tickeds > 0)
{
//模拟一次抢票逻辑
usleep(1000);
printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);
ptr->_tickeds--;
ptr->_total++;
}
else
{
break;
}
}
return ;
}
const int num = 4;
int main()
{
//创建一批线程
std::vector<Thread<ThreadData*>> threads;
std::vector<ThreadData*> datas;
for(int i = 0;i < num;i++)
{
std::string name = "thread-" + std::to_string(i+1);
ThreadData* _ptr = new ThreadData(name,g_tickets);
threads.emplace_back(route,_ptr,name);
datas.emplace_back(_ptr);
}
//启动一批线程
for(auto& thread:threads)
{
thread.Start();
}
//等待一批线程
for(auto& thread:threads)
{
thread.Join();
std::cout<<"wait thread name: "<<thread.name()<<std::endl;
}
for(auto data: datas)
{
std::cout<<"name: "<<data->_threadname<<"total is: "<<data->_total<<std::endl;
delete data;
}
return 0;
}
实验现象:
为什么会抢到票为-1,或-2的票呢?
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- _tickeds-- 操作本身就不是一个原子操作
取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <_tickeds>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34 <_tickeds>
– 操作并不是原子操作,而是对应三条汇编指令:
- load :将共享变量ticket从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1操作
- store :将新值,从寄存器写回共享变量ticket的内存地址
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临
界区。- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
2. 互斥量的接口
初始化互斥量
初始化互斥量有两种方法:
- 方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量
销毁互斥量需要注意:
- 使用PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int ptread_mutex_lock(pthread_mutex* mutex);
int ptread_mutex_unlock(pthread_mutex* mutex);
返回值:成功返回0,失败返回错误号
调用pthread_lock
可能会出现以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但有竞争到互斥量,那么
pthread_lock
调用会陷入阻塞(执行流被挂起),等待互斥量解锁
改进上面的售票系统:
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
void route(ThreadData* ptr)
{
while(true)
{
pthread_mutex_lock(&getmutex);//加锁:本质是将并行执行--》串行执行,加锁的粒度越细越好
if(ptr->_tickeds > 0)
{
usleep(1000);
printf("%s is runing ,get ticked: %d\n",
ptr->_threadname.c_str(),ptr->_tickeds);
ptr->_tickeds--;
pthread_mutex_unlock(&getmutex);//解锁
ptr->_total++;
}
else
{
pthread_mutex_unlock(&getmutex);//解锁
break;
}
}
return ;
}
更优雅的代码:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include<pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
#endif
#include"Lock_Guard.hpp"
#include"Thread.hpp"
#include<iostream>
#include <iostream>
#include <vector>
#include<mutex>
using namespace ThreadModule;
//静态锁
//pthread_mutex_t getmutex = PTHREAD_MUTEX_INITIALIZER;//静态锁
class ThreadData
{
public:
ThreadData(const std::string& threadname,int& ticked,std::mutex& mutex)
:_threadname(threadname),_tickeds(ticked),_total(0),_mutex(mutex)
{}
~ThreadData()
{}
public:
std::string _threadname;
int& _tickeds;
int _total;
// pthread_mutex_t& _mutex;
std::mutex& _mutex;
};
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
void route(ThreadData* ptr)
{
while(true)
{
//加锁:本质是将并行执行--》串行执行,加锁的粒度越细越好
//线程竞争锁是自由竞争的,竞争锁的能力太强,就会导致其他线程抢不到锁,---造成其他线程的饥饿问题!!!
//pthread_mutex_lock(&ptr->_mutex); 动态锁
//pthread_mutex_lock(&getmutex); 静态锁
//LockGuard mutex(&ptr->_mutex); 自己封装的RAII锁
// std::lock_guard<std::mutex> lock(ptr->_mutex); //C++11RAII锁
ptr->_mutex.lock(); //C++11锁
//模拟一次抢票逻辑
if(ptr->_tickeds > 0)
{
usleep(1000);
printf("%s is runing ,get ticked: %d\n",
ptr->_threadname.c_str(),ptr->_tickeds);
ptr->_tickeds--;
//pthread_mutex_unlock(&ptr->_mutex);//解锁
//pthread_mutex_unlock(&getmutex);
ptr->_mutex.unlock();
ptr->_total++;
}
else
{
//pthread_mutex_unlock(&ptr->_mutex);//解锁
//pthread_mutex_unlock(&getmutex);
ptr->_mutex.unlock();
break;
}
}
return ;
}
const int num = 4;
int main()
{
//动态锁
// pthread_mutex_t mutex;
// pthread_mutex_init(&mutex,nullptr);
//C++11锁
std::mutex mutex;
//创建一批线程
std::vector<Thread<ThreadData*>> threads;
std::vector<ThreadData*> datas;
for(int i = 0;i < num;i++)
{
std::string name = "thread-" + std::to_string(i+1);
ThreadData* _ptr = new ThreadData(name,g_tickets,mutex);
threads.emplace_back(route,_ptr,name);
datas.emplace_back(_ptr);
}
//启动一批线程
for(auto& thread:threads)
{
thread.Start();
}
//等待一批线程
for(auto& thread:threads)
{
thread.Join();
std::cout<<"wait thread name: "<<thread.name()<<std::endl;
}
for(auto data: datas)
{
std::cout<<"name: "<<data->_threadname<<"total is: "<<data->_total<<std::endl;
delete data;
//pthread_mutex_destroy(&data->_mutex);
data->_mutex.~mutex();
}
return 0;
}
2. 互斥的底层实现
- 经过上面的例子,大家已经意识到单纯的i++ 或者++i 都不是原子的,有可能会有数据一致性问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单
元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一
个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下
加锁and解锁:
线程间的同步
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情
况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问
题,叫做同步- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
1. 条件变量函数接口
初始化
- 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 动态初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:
cond:要初始化的条件变量
attr:NULL
- 销毁
int pthread_con_destroy(pthread_con_t *cond);
- 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
- 唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
实验:线程同步
#include<iostream>
#include<vector>
#include<string>
#include<unistd.h>
#include<pthread.h>
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;//互斥量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
void* Mastercore(void* args)
{
sleep(3);
std::cout<<"mastcore 开始唤醒..."<<std::endl;
std::string name = static_cast<char*>(args);
//唤醒...
while(true)
{
pthread_cond_signal(&cond);//唤醒等待队列中第一个线程
sleep(1);
}
//pthread_cond_broadcast(&cond);//唤醒等待队列中所有线程
return nullptr;
}
void* Slavercore(void* args)
{
std::string name = static_cast<char*>(args);
while(true)
{
pthread_mutex_lock(&gmutex);//加锁
pthread_cond_wait(&cond,&gmutex);// 等待条件变量,
std::cout<<name<<"被唤醒..."<<std::endl;
//TODO
pthread_mutex_unlock(&gmutex);//解锁
}
}
void StartMaster(std::vector<pthread_t>* tidptr)
{
pthread_t tid;
int n = pthread_create(&tid,nullptr,Mastercore,(void*)"master thread");
if(n == 0)
{
std::cout<<"master thread create success ..."<<std::endl;
}
tidptr->emplace_back(tid);
}
void StartSlaver(std::vector<pthread_t>* tidptr,int threadnum)
{
for(int i = 0;i< threadnum;i++)
{
pthread_t tid;
char* name = new char[64];
snprintf(name,64,"thread-%d",i+1);
int n = pthread_create(&tid,nullptr,Slavercore,name);
if(n == 0)
{
std::cout<<name<<" create success ..."<<std::endl;
}
tidptr->emplace_back(tid);
}
}
void WaitThread(const std::vector<pthread_t>& tids)
{
for(auto tid:tids)
{
pthread_join(tid,nullptr);
}
}
int main()
{
std::vector<pthread_t> tids;
StartMaster(&tids);
StartSlaver(&tids,5);
WaitThread(tids);
return 0;
}
结果:
生产者与消费者模型
- 解耦
- 支持并发
- 支持忙闲不均:是指在一个系统中,不同组件或线程之间工作负载分配不均匀的现象。
实验:生产者与消费者模型基础版
阻塞队列:
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
template <typename T>
class BlockQueue
{
public:
bool is_full()
{
return _BlockQueue.size() == _cap;
}
bool is_empty()
{
return _BlockQueue.empty();
}
public:
BlockQueue(int cap)
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_productor_cond, nullptr);
pthread_cond_init(&_consumer_cond, nullptr);
}
void enqueue(T &in) // 生产者使用接口
{
pthread_mutex_lock(&_mutex);
if (is_full())
{
pthread_cond_wait(&_productor_cond, &_mutex); // 如果满了,生产者入等待队列,解锁--唤醒--出等待队列,锁定
}
_BlockQueue.push(in);
// 通知消费者来买
// std::cout << "通知消费者来买" << std::endl;
pthread_cond_signal(&_consumer_cond);
// std::cout << "通知完成" << std::endl;
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
if (is_empty())
{
// std::cout << "消费者入等待队列" << std::endl;
pthread_cond_wait(&_consumer_cond, &_mutex); // 如果空了,消费者入等待队列,解锁,---被唤醒--出等待队列,锁定
}
*out = _BlockQueue.front();
_BlockQueue.pop();
// 通知生产者来卖
pthread_cond_signal(&_productor_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productor_cond);
pthread_cond_destroy(&_consumer_cond);
}
private:
std::queue<T> _BlockQueue;
int _cap; // 阻塞队列上限
pthread_mutex_t _mutex;
pthread_cond_t _productor_cond; // 生产者等待队列
pthread_cond_t _consumer_cond; // 消费者等待队列
};
#endif
主函数:
#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
using namespace ThreadModule;
void producer(BlockQueue<int> &bq)
{
int cnt = 3;
while (true)
{
bq.enqueue(cnt);
std::cout << "producer is sell :" << cnt << std::endl;
cnt++;
}
}
void consumer(BlockQueue<int> &bq)
{
while (true)
{
sleep(3);
int data;
bq.pop(&data); // 为什么传地址,通过地址修改cnt
std::cout << "consumer is buy :" << data << std::endl;
}
}
void Start_Com(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num, func_t<BlockQueue<int>> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads_ptr->emplace_back(func, bq, name);
threads_ptr->back().Start();
}
}
void StartProducer(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num)
{
Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num)
{
Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{
for (auto thread : threads)
{
thread.Join();
}
}
int main()
{
BlockQueue<int> *ptr = new BlockQueue<int>(5);
std::vector<Thread<BlockQueue<int>>> threads;
StartProducer(&threads, *ptr, 1);
StartConsumer(&threads, *ptr, 1);
WaitAllThread(threads);
return 0;
}
升级版:传递任务1. 0
#pragma once
#include <iostream>
class Task
{
public:
Task()
{}
Task(int a, int b)
: _a(a), _b(b)
{}
void Excute()
{
_result = _a+_b;
}
std::string ResultToString()
{
return std::to_string(_a)+" + "+std::to_string(_b)+" = "+std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a)+" + "+std::to_string(_b)+" = ?";
}
private:
int _a;
int _b;
int _result;
};
#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;
void producer(BlockQueue<Task> &bq)
{
srand((unsigned)time(NULL));
int cnt = 3;
while (true)
{
sleep(2);
int first = rand() % 100;
usleep(1234);
int second = rand() % 100;
Task tk(first, second);
bq.enqueue(tk);
std::cout << tk.DebugToString() << std::endl;
}
}
void consumer(BlockQueue<Task> &bq)
{
while (true)
{
Task td;
bq.pop(&td); // 为什么传地址,通过地址修改cnt
td.Excute();
std::cout << td.ResultToString() << std::endl;
}
}
void Start_Com(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num, func_t<BlockQueue<Task>> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads_ptr->emplace_back(func, bq, name);
threads_ptr->back().Start();
}
}
void StartProducer(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num)
{
Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num)
{
Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<BlockQueue<Task>>> &threads)
{
for (auto thread : threads)
{
thread.Join();
}
}
int main()
{
BlockQueue<Task> *ptr = new BlockQueue<Task>(5);
std::vector<Thread<BlockQueue<Task>>> threads;
StartProducer(&threads, *ptr, 1);
StartConsumer(&threads, *ptr, 3);
WaitAllThread(threads);
return 0;
}
传递任务2.0
#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;
using Task = std::function<void()>;
using bock_queue_t = BlockQueue<Task>;
void printdata()
{
std::cout << "hell word" << std::endl;
}
void producer(bock_queue_t &bq)
{
while (true)
{
sleep(1);
Task t = printdata;
bq.enqueue(t);
}
}
void consumer(bock_queue_t &bq)
{
while (true)
{
Task tk;
bq.pop(&tk); // 为什么传地址,通过地址修改cnt
tk();
}
}
void Start_Com(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, bock_queue_t &bq, int num, func_t<bock_queue_t> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads_ptr->emplace_back(func, bq, name);
threads_ptr->back().Start();//创建线程
}
}
void StartProducer(std::vector<Thread<bock_queue_t>> *threads_ptr, bock_queue_t &bq, int num)
{
Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<bock_queue_t>> *threads_ptr, bock_queue_t &bq, int num)
{
Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<bock_queue_t>> &threads)
{
for (auto thread : threads)
{
thread.Join();
}
}
int main()
{
bock_queue_t *ptr = new bock_queue_t(5);
std::vector<Thread<bock_queue_t>> threads;
StartProducer(&threads, *ptr, 1);
StartConsumer(&threads, *ptr, 3);
WaitAllThread(threads);
return 0;
}
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
- 初始化信号量
#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
参数:
pshared: 0表示线程间共享,非零表示进程间共享
value:表示信号量初始值
- 销毁信号量
int sem_destroy(sem_t* sem);
- 等待信号量
功能:等待信号量,会将信号量值减一
int sem_wait(sem_t * sem);//P()
- 发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了,将信号量的值叫加一
int sem_post(sem_t * sem);//V()
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
(POSIX信号量):
基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
#ifndef __RING_QUEUE_HPP__
#define __RING_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <vector>
template <typename T>
class RingQueue
{
public:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap)
: _ring_queue(cap), _cap(cap), _productor_index(0), _consumer_index(0)
{
sem_init(&_room_sem, 0, _cap);//信号量初始化,空间资源初始为_ring_queue的容量
sem_init(&_data_sem, 0, 0);//数据资源初始为 0
pthread_mutex_init(&_productor_mutex, nullptr);//互斥量初始化
pthread_mutex_init(&_consumer_mutex, nullptr);
}
void emquue(T &in)
{
// 生产行为
//P操作用来减少信号量的值(通常是减1)。
//如果_room_sem信号量的值大于0,执行P操作后,信号量的值减1,进程继续执行。
//如果信号量的值为0,执行P操作后,进程会被阻塞,直到信号量的值变为大于0,这时进程才会被唤醒并继续执行。
P(_room_sem);
Lock(_productor_mutex);//加锁,维护生产者与生产者的竞争
_ring_queue[_productor_index++] = in;//生产数据
_productor_index %= _ring_queue.size();
Unlock(_productor_mutex);
V(_data_sem);//当_data_sem信号量的值增加后,如果有进程因为执行P(_data_sem)操作而被阻塞在该信号量上,
//那么系统会选择一个或多个进程解除其阻塞状态,允许它们继续执行
}
void pop(T *out)
{
// 消费行为
P(_data_sem);//_data
Lock(_consumer_mutex);
*out = _ring_queue[_consumer_index++];
_consumer_index %= _ring_queue.size();
Unlock(_consumer_mutex);
V(_room_sem);
}
~RingQueue()
{
sem_destroy(&_room_sem);//销毁信号量
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_productor_mutex);//销毁互斥量
pthread_mutex_destroy(&_consumer_mutex);
}
private:
// 环形队列
std::vector<T> _ring_queue;
int _cap;
// 生产者与消费者下标
int _productor_index;
int _consumer_index;
// 信号量
sem_t _room_sem;//空间信号量
sem_t _data_sem;//数据信号量
// 互斥量
pthread_mutex_t _productor_mutex;//生产者互斥量
pthread_mutex_t _consumer_mutex;//消费者互斥量
};
#endif
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <functional>
namespace ThreadModule
{
template <typename T>
using func_t = std::function<void(T &,std::string)>;
template <typename T>
class Thread
{
public:
Thread(func_t<T> func, T& data, const std::string threadname = "none_name") // 为什么--常量引用和非常量引用的概念
: _func(func), _data(data), _threadname(threadname), _stop(true)
{}
void Excute()
{
_func(_data,_threadname);
}
static void *handler(void *args)
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int ret = pthread_create(&_tid, nullptr, handler, this);
if (ret == 0)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
~Thread() {}
private:
std::string _threadname;
pthread_t _tid;
T& _data;
func_t<T> _func;
bool _stop;
};
}
#endif
#include <iostream>
#include "RingQueue.hpp"
#include "Thread.hpp"
using namespace ThreadModule;
using Task = std::function<void()>;
using ring_queue_t = RingQueue<Task>;
void printdata()
{
std::cout << "hell word" << std::endl;
}
void producer(ring_queue_t &bq, std::string name)
{
int cnt = 10;
while (true)
{
sleep(2);
Task t = printdata;
bq.emquue(t); // 传递任务
// std::cout<< name << " in: " << cnt << std::endl;
// cnt++;
}
}
void consumer(ring_queue_t &bq, std::string name)
{
while (true)
{
int cnt;
Task tk;
bq.pop(&tk);
tk(); // 执行执行任务
std::cout << name << " is run : task " << std::endl;
}
}
void Init_Com(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num, func_t<ring_queue_t> func, const std::string &who)
{
for (int i = 0; i < num; i++)
{
std::string _name = "thread- " + std::to_string(i + 1) + " " + who;
threads_ptr->emplace_back(func, rq, _name);
// threads_ptr->back().Start();
}
}
void InitProducer(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num)
{
Init_Com(threads_ptr, rq, num, producer, "producer");
}
void InitConsumer(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num)
{
Init_Com(threads_ptr, rq, num, consumer, "consumer");
}
void StartAllThread(std::vector<Thread<ring_queue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Start();
}
}
void WaitAllThread(std::vector<Thread<ring_queue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Join();
}
}
int main()
{
ring_queue_t *ptr = new ring_queue_t(10);
std::vector<Thread<ring_queue_t>> threads; // 所有副线程共享ring_queue
InitProducer(&threads, *ptr, 1); // 生产者初始化
InitConsumer(&threads, *ptr, 3); // 消费者初始化
StartAllThread(threads); // 启动所有副线程
WaitAllThread(threads); // 等待所有副线程
return 0;
}
[!NOTE]
代码一定要多敲,才能明白里面的细节,加油👍👍👍
【Linux】互斥和同步—完结。下一章【Linux】线程池