目录
一.线程互斥
1.1相关概念
• 临界资源:多线程执行流共享的资源就叫做临界资源
• 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
• 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起 保护作用
• 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成, 要么未完成
1.2互斥量
• 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量 归属单个线程,其他线程无法获得这种变量。
• 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完 成线程之间的交互。
• 多个线程并发的操作共享变量,会带来一些问题。
直接看代码:
#include<iostream>
#include<pthread.h>
#include<string>
#include<unistd.h>
int ticket = 1000;
void *routine(void* args)
{
char* id = (char*)args;
while(true)
{
if(ticket>0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1,nullptr,routine,(void*)"thread-1 ");
pthread_create(&tid2,nullptr,routine,(void*)"thread-2 ");
pthread_create(&tid3,nullptr,routine,(void*)"thread-3 ");
pthread_create(&tid4,nullptr,routine,(void*)"thread-4 ");
pthread_join(tid1,nullptr);
pthread_join(tid2,nullptr);
pthread_join(tid3,nullptr);
pthread_join(tid4,nullptr);
return 0;
}
明明ticket大于0,才会进行减一,这里竟然出现了负数。
为什么会出现负数??
• if 语句判断条件为真以后,代码可以并发的切换到其他线程
• usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码 段
• --ticket 操作本身就不是一个原子操作
-- 操作并不是原子操作,而是对应三条汇编指令:
• load :将共享变量ticket从内存加载到寄存器中
• update :更新寄存器里面的值,执行-1操作
• store :将新值,从寄存器写回共享变量ticket的内存地址
从内存加载到寄存器里,再去进行减一操作,最后放回内存。假如现在ticket被减为1了,那么四个线程都可以进入到临界区,到时候线程1进行减一操作,做三件事情,最后会把减后的0放回到内存。之后,线程2来拿的就是0,减为-1.
• 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
• 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程 进入该临界区。
• 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
互斥量的接口
销毁互斥量:
• 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
• 不要销毁一个已经加锁的互斥量
• 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
加锁和解锁:
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
两种方式,一种是全局的静态分配:
另一种:
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
class ThreadDate
{
public:
ThreadDate(const std::string n, pthread_mutex_t &lock)
: name(n),
lockp(&lock)
{
}
~ThreadDate() {}
std::string name;
pthread_mutex_t *lockp;
};
int ticket = 1000;
void *routine(void *args)
{
ThreadDate* td = static_cast<ThreadDate*>(args);
while (true)
{
// pthread_mutex_lock(&glock);
pthread_mutex_lock(td->lockp);
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", td->name.c_str(), ticket);
ticket--;
// pthread_mutex_unlock(&glock);
pthread_mutex_unlock(td->lockp);
}
else
{
// pthread_mutex_unlock(&glock);
pthread_mutex_unlock(td->lockp);
break;
}
}
return nullptr;
}
int main()
{
pthread_mutex_t lock;
pthread_mutex_init(&lock,nullptr);
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
ThreadDate* td1 = new ThreadDate("thread 1",lock);
pthread_create(&tid1, nullptr, routine, td1);
ThreadDate* td2 = new ThreadDate("thread 2",lock);
pthread_create(&tid2, nullptr, routine, td2);
ThreadDate* td3 = new ThreadDate("thread 3",lock);
pthread_create(&tid3, nullptr, routine, td3);
ThreadDate* td4 = new ThreadDate("thread 4",lock);
pthread_create(&tid4, nullptr, routine, td4);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
pthread_mutex_destroy(&lock);
return 0;
}
问题:
加锁之后,子啊临界区内部,允许线程切换吗(允许)。
因为我在当前的进程没有进行释放锁的操作,我是持有锁被切换的,相当于拿着钥匙,钥匙不给其他线程,其他线程也只能干等着,等我回来释放锁,其他线程才能展开锁的竞争,进入临界区。
所有线程都必须遵守这个规则
1.3互斥量实现原理探究
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和 内存单元的数据相交换,由于只有⼀条指令,保证了原子性,即使是多处理器平台,访问内存的总线周 期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
1.4互斥量封装
testMutex.cc
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include "Mutex.hpp"
// pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
using namespace MutexModule;
class ThreadDate
{
public:
ThreadDate(const std::string n, Mutex &lock)
: name(n),
lockp(&lock)
{
}
~ThreadDate() {}
std::string name;
Mutex *lockp;
};
int ticket = 1000;
void *routine(void *args)
{
ThreadDate *td = static_cast<ThreadDate *>(args);
while (true)
{
{
MutexGuard guard(td->lockp);
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", td->name.c_str(), ticket);
ticket--;
}
else
{
break;
}
}
}
return nullptr;
}
int main()
{
Mutex lock;
// pthread_mutex_init(&lock,nullptr);
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
ThreadDate *td1 = new ThreadDate("thread 1", lock);
pthread_create(&tid1, nullptr, routine, td1);
ThreadDate *td2 = new ThreadDate("thread 2", lock);
pthread_create(&tid2, nullptr, routine, td2);
ThreadDate *td3 = new ThreadDate("thread 3", lock);
pthread_create(&tid3, nullptr, routine, td3);
ThreadDate *td4 = new ThreadDate("thread 4", lock);
pthread_create(&tid4, nullptr, routine, td4);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
// pthread_mutex_destroy(&lock);
return 0;
}
Mutex.hpp
#ifndef __MUTEX_H_
#define __MUTEX_H_
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex,nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
if(n!=0)
{
std::cout<<"lock error"<<std::endl;
exit(1);
}
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
if(n!=0)
{
std::cout<<"unlock error"<<std::endl;
exit(1);
}
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
class MutexGuard
{
public:
MutexGuard(Mutex *mutex)
:_mutex(mutex)
{
_mutex->Lock();
}
~MutexGuard()
{
_mutex->Unlock();
}
private:
Mutex *_mutex;
};
}
#endif
仅仅通过MutexGuard的对象,只要我们创建好,我们就实现了RAII代码风格的封装。
二.线程同步
2.1条件变量
• 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
• 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列 中。这种情况就需要用到条件变量。
2.2同步概念与竞态条件
• 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免 饥饿问题,叫做同步
• 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也 不难理解
2.3接口
#include<iostream>
#include<pthread.h>
#include<string>
#include<vector>
#include<unistd.h>
#define NUM 5
int cnt = 1000;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
void* routine(void* args)
{
std::string name = static_cast<const char*>(args);
while (true)
{
pthread_mutex_lock(&glock);
pthread_cond_wait(&gcond, &glock); // glock在pthread_cond_wait之前,会被自动释放掉
std::cout << name << " 计算: " << cnt << std::endl;
cnt++;
pthread_mutex_unlock(&glock);
}
}
int main()
{
std::vector<pthread_t> threads;
for(int i=0;i<NUM;i++)
{
pthread_t tid;
char id[64];
snprintf(id,sizeof(id),"thread-%d",i);
int n = pthread_create(&tid,nullptr,routine,id);
if(n!=0)
continue;
threads.push_back(tid);
sleep(1);
}
while(true)
{
std::cout << "唤醒所有线程... " << std::endl;
pthread_cond_broadcast(&gcond);
// std::cout << "唤醒一个线程... " << std::endl;
// pthread_cond_signal(&gcond);
sleep(1);
}
for(int i=0;i<NUM;i++)
{
pthread_join(threads[i],nullptr);
}
return 0;
}
2.4生产者消费者模型
生产者之间:竞争关系,互斥关系
消费者之间的关系:互斥关系
生产者和消费者之间的关系:互斥关系,同步关系
两种角色:生产者和消费者(这里由线程承担)
一个交易场所:以特定结构构成的内存空间。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
优点
解耦。支持忙闲不均,提高效率。
2.5基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
代码实现
BlockQueue.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include "Task.hpp"
const int defaultcap = 5;
template <class T>
class BlockQueue
{
bool IsFull() { return _q.size() >= _cap; }
bool IsEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = defaultcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full_cond, nullptr);
pthread_cond_init(&_empty_cond, nullptr);
}
void Equeue(const T &in)
{
// 队列是临界资源
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 满了就应该让生产者进行等待
// 1.pthread_cond_wait调用成功,挂起当前的线程之前,要先把锁自动释放(所以满等待的时候,不影响消费的过程)
// 这也是需要传入锁的原因
// 2.当线程被唤醒的时候,默认在临界区内唤醒,从pthread_cond_wait成功返回(是在临界区被唤醒的)
// ,需要当前的进程重新申请mutex锁
// 3.被唤醒之后,没抢到锁??会在锁上组阻塞等待
_psleep_num++;
//pthread_cond_wait失败了怎么搞??会立即返回
//可能会被伪唤醒
std::cout<<"生产者进入休眠"<<std::endl;
pthread_cond_wait(&_full_cond, &_mutex);
_psleep_num--;
}
_q.push(in);
//到这里队列里一定会有数据,如果消费者在休眠就唤醒它
if(_csleep_num>0)
{
pthread_cond_signal(&_empty_cond);
std::cout<<"唤醒消费者"<<std::endl;
}
pthread_mutex_unlock(&_mutex);
}
T Pop()
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
_csleep_num++;
pthread_cond_wait(&_empty_cond, &_mutex);
_csleep_num--;
}
T data = _q.front();
_q.pop();
if(_psleep_num>0)
{
pthread_cond_signal(&_full_cond);
std::cout<<"唤醒生产者"<<std::endl;
}
pthread_mutex_unlock(&_mutex);
return data;
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full_cond);
pthread_cond_destroy(&_empty_cond);
}
private:
std::queue<T> _q; // 临界资源
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _full_cond; // 生产者生产满了,就在这里等待
pthread_cond_t _empty_cond; // 消费者消费完了,在这里等待
int _csleep_num;
int _psleep_num;
};
main.cc
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<functional>
#include"BlockQueue.hpp"
#include"Task.hpp"
void *consumer(void* args)
{
BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
while(true)
{
sleep(10);
//int data = bq->Pop();
task_t t = bq->Pop();
t();
}
}
void *productor(void* args)
{
BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
while (true)
{
//sleep(10);
std::cout<<"生产了一个任务:"<<std::endl;
//Task t(x,y);
bq->Equeue(Download);
}
}
int main()
{
//申请阻塞队列
BlockQueue<task_t>* bq = new BlockQueue<task_t>();
//线程构建生产者和消费者
pthread_t c[2],p[3];
pthread_create(c,nullptr,consumer,bq);
pthread_create(c+1,nullptr,consumer,bq);
pthread_create(p,nullptr,productor,bq);
pthread_create(p+1,nullptr,productor,bq);
pthread_create(p+2,nullptr,productor,bq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(p[2],nullptr);
return 0;
}
Task.hpp
#pragma once
#include<functional>
#include<iostream>
using task_t = std::function<void()>;
void Download()
{
std::cout << "我是一个下载任务..." << std::endl;
sleep(3); // 假设处理任务比较耗时
}
class Task
{
public:
Task(){}
Task(int x,int y):_x(x),_y(y)
{
}
void Execute()
{
_sum=_x+_y;
}
int sum()
{
return _sum;
}
private:
int _x;
int _y;
int _sum;
};
这里是精华:
提高效率??
不是体现在入交易场所和出交易场所,而是在未来获取任务和处理任务的时候是并发的。
t()是在临界资源外部。
条件变量的封装
Cond.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include"Mutex.hpp"
using namespace MutexModule;
namespace CondModule
{
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
void Wait(Mutex& mutex)
{
pthread_cond_wait(&_cond,mutex.Get());
}
void Signal()
{
pthread_cond_signal(&_cond);
}
void Broadcast()
{
pthread_cond_broadcast(&_cond);
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};
}
Mutex.hpp
#ifndef __MUTEX_H_
#define __MUTEX_H_
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex,nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
if(n!=0)
{
std::cout<<"lock error"<<std::endl;
exit(1);
}
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
if(n!=0)
{
std::cout<<"unlock error"<<std::endl;
exit(1);
}
}
pthread_mutex_t *Get()
{
return &_mutex;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
class MutexGuard
{
public:
MutexGuard(Mutex *mutex)
:_mutex(mutex)
{
_mutex->Lock();
}
~MutexGuard()
{
_mutex->Unlock();
}
private:
Mutex *_mutex;
};
}
#endif
Task.hpp
#pragma once
#include<functional>
#include<iostream>
using task_t = std::function<void()>;
void Download()
{
std::cout << "我是一个下载任务..." << std::endl;
sleep(3); // 假设处理任务比较耗时
}
class Task
{
public:
Task(){}
Task(int x,int y):_x(x),_y(y)
{
}
void Execute()
{
_sum=_x+_y;
}
int sum()
{
return _sum;
}
private:
int _x;
int _y;
int _sum;
};
BlockQueue.hpp的代码只需要稍微修改一下就行了:
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include "Task.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace CondModule;
using namespace MutexModule;
const int defaultcap = 5;
template <class T>
class BlockQueue
{
bool IsFull() { return _q.size() >= _cap; }
bool IsEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = defaultcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
}
void Equeue(const T &in)
{
MutexGuard mutexguard(&_mutex);
while (IsFull())
{
// 满了就应该让生产者进行等待
// 1.pthread_cond_wait调用成功,挂起当前的线程之前,要先把锁自动释放(所以满等待的时候,不影响消费的过程)
// 这也是需要传入锁的原因
// 2.当线程被唤醒的时候,默认在临界区内唤醒,从pthread_cond_wait成功返回(是在临界区被唤醒的)
// ,需要当前的进程重新申请mutex锁
// 3.被唤醒之后,没抢到锁??会在锁上组阻塞等待
_psleep_num++;
//pthread_cond_wait失败了怎么搞??会立即返回
//可能会被伪唤醒
std::cout<<"生产者进入休眠"<<std::endl;
_full_cond.Wait(_mutex);
_psleep_num--;
}
_q.push(in);
//到这里队列里一定会有数据,如果消费者在休眠就唤醒它
if(_csleep_num>0)
{
_empty_cond.Signal();
std::cout<<"唤醒消费者"<<std::endl;
}
}
T Pop()
{
T data;
MutexGuard mutexguard(&_mutex);
while (IsEmpty())
{
_csleep_num++;
_full_cond.Wait(_mutex);
_csleep_num--;
}
data = _q.front();
_q.pop();
if(_psleep_num>0)
{
_full_cond.Signal();
std::cout<<"唤醒生产者"<<std::endl;
}
return data;
}
~BlockQueue()
{
}
private:
std::queue<T> _q; // 临界资源
int _cap;
Mutex _mutex;
Cond _full_cond; // 生产者生产满了,就在这里等待
Cond _empty_cond; // 消费者消费完了,在这里等待
int _csleep_num;
int _psleep_num;
};
最后是Main.cc
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<functional>
#include"BlockQueue.hpp"
#include"Task.hpp"
void *consumer(void* args)
{
BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
while(true)
{
sleep(3);
//int data = bq->Pop();
task_t t = bq->Pop();
t();
}
}
void *productor(void* args)
{
BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
while (true)
{
//sleep(10);
std::cout<<"生产了一个任务:"<<std::endl;
//Task t(x,y);
bq->Equeue(Download);
}
}
int main()
{
//申请阻塞队列
BlockQueue<task_t>* bq = new BlockQueue<task_t>();
//线程构建生产者和消费者
pthread_t c,p;
pthread_create(&c,nullptr,consumer,bq);
// pthread_create(c+1,nullptr,consumer,bq);
pthread_create(&p,nullptr,productor,bq);
// pthread_create(p+1,nullptr,productor,bq);
// pthread_create(p+2,nullptr,productor,bq);
pthread_join(c,nullptr);
// pthread_join(c[1],nullptr);
pthread_join(p,nullptr);
// pthread_join(p[1],nullptr);
// pthread_join(p[2],nullptr);
return 0;
}
2.6POSIX信号量
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"
static const int gcap = 5;
using namespace SemModule;
using namespace MutexModule;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gcap)
: _rq(cap),
_cap(cap),
_blank_sem(cap),
_p_step(0),
_data_sem(0),
_c_step(0)
{
}
void Equeue(const T &in)
{
// _pmutex.Lock();//在这里加锁??
//生产者
//1、申请信号量,空位置的信号量
_blank_sem.P();
_pmutex.Lock();//还是在这里加,都可以,但是在这里加的话效率更高。因为所有的线程都可以去申请信号量,
//申请完毕再去竞争锁。反之除了申请到锁的线程,其他线程都要在锁里等待,它们都没有信号量
//下次再来的时候还要申请锁。就比如排队去电影院,是先每个人买票去排队,还是先排队再去一个一个买票
//2、生产
_rq[_p_step] = in;
//3、更新下标
_p_step++;
_p_step%=_cap;
_pmutex.Unlock();
_data_sem.V();
}
void Pop(T *out)
{
//消费者
//1、申请信号量,数据信号量
_data_sem.P();
_cmutex.Lock();
//2、消费
*out = _rq[_c_step];
//3、更新下标
++_c_step;
_c_step %= _cap;
_cmutex.Unlock();
_blank_sem.V();
}
~RingQueue() {}
private:
std::vector<T> _rq;
int _cap;
// 生产者
Sem _blank_sem; // 空位置
int _p_step;
// 消费者
Sem _data_sem; // 数据
int _c_step;
//用锁来维护多生产多消费
Mutex _cmutex;
Mutex _pmutex;
};
Sem.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<semaphore.h>
//const int defaultvalue = 1;
namespace SemModule
{
const int defaultvalue = 1;
class Sem
{
public:
Sem(unsigned int sem_value = defaultvalue)
{
sem_init(&_sem,0,sem_value);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};
}
Mutex.hpp
#ifndef __MUTEX_H_
#define __MUTEX_H_
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex,nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
if(n!=0)
{
std::cout<<"lock error"<<std::endl;
exit(1);
}
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
if(n!=0)
{
std::cout<<"unlock error"<<std::endl;
exit(1);
}
}
pthread_mutex_t *Get()
{
return &_mutex;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
class MutexGuard
{
public:
MutexGuard(Mutex *mutex)
:_mutex(mutex)
{
_mutex->Lock();
}
~MutexGuard()
{
_mutex->Unlock();
}
private:
Mutex *_mutex;
};
}
#endif
Main.cc
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<functional>
#include"Sem.hpp"
#include"RingQueue.hpp"
void *consumer(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
//sleep(3);
//int data = bq->Pop();
int t = 0;
rq->Pop(&t);
std::cout<<"消费者拿到了一个数据"<<t<<std::endl;
}
}
void *productor(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
int data = 1;
while (true)
{
//sleep(1);
std::cout<<"生产了一个任务:"<<data<<std::endl;
//Task t(x,y);
rq->Equeue(data++);
}
}
int main()
{
//申请环队列
RingQueue<int>* rq = new RingQueue<int>();
//线程构建生产者和消费者
pthread_t c[2],p[3];
pthread_create(c,nullptr,consumer,rq);
pthread_create(c+1,nullptr,consumer,rq);
pthread_create(p,nullptr,productor,rq);
pthread_create(p+1,nullptr,productor,rq);
pthread_create(p+2,nullptr,productor,rq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(p[2],nullptr);
return 0;
}
注意这里的加锁顺序:
三.线程池
3.1自己实现日志
最终格式:
刷新策略类:
class LogStrategy
{
public:
~LogStrategy() = default;
virtual void SyncLog(const std::string &message) = 0;
};
因为可以刷新到显示器和文件,所以使用继承的方式来实现:
显示器打印日志的策略:
// 显示器打印日志的策略
class ConsoleLogstrategy : public LogStrategy
{
public:
ConsoleLogstrategy() {}
virtual void SyncLog(const std::string &message) override
{
MutexGuard lockguard(&_mutex);
std::cout << message << gsep;
}
~ConsoleLogstrategy() {}
private:
Mutex _mutex;
};
文件打印日志的策略:
// 文件打印日志的策略
const std::string defaultpath = "./log";
const std::string defaultfile = "my.log";
class FileLogStrategy : public LogStrategy
{
public:
FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile)
: _path(path),
_file(file)
{
if (std::filesystem::exists(_path)) // 如果_path存在
{
return;
}
try
{
std::filesystem::create_directories(_path); // 如果创建失败了,try catch捕捉一下
}
catch (const std::filesystem::filesystem_error &e)
{
std::cerr << e.what() << '\n';
}
}
virtual void SyncLog(const std::string &message) override
{
MutexGuard lockguard(&_mutex);
std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file;
std::ofstream out(filename, std::ios::app); // 追加写入的方式打开
if (!out.is_open())
{
return;
}
out << message << gsep;
out.close();
}
~FileLogStrategy() {}
private:
std::string _path; // 需要写的日志的路径
std::string _file; // 需要写的文件
Mutex _mutex;
};
路径存在返回,不存在就创建。这里用的是filesystem。
刷新方式,刷新方式转换字符串,获取当前实现
// 选择不同的刷新方式
enum class LogLevel
{
DEBUG,
INFO,
WARING,
ERROR,
FATAL,
};
std::string LevelToString(LogLevel level)
{
switch (level)
{
case LogLevel::DEBUG:
return "DEBUG";
case LogLevel::INFO:
return "INFO";
case LogLevel::WARING:
return "WARING";
case LogLevel::ERROR:
return "ERROR";
case LogLevel::FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
std::string GetCurTime()
{
time_t cur = time(nullptr);
struct tm cur_tm;
localtime_r(&cur, &cur_tm);
char buffertime[64];
snprintf(buffertime, sizeof(buffertime), "%4d-%02d-%d %d:%d:%d",
cur_tm.tm_year + 1900,
cur_tm.tm_mon + 1,
cur_tm.tm_mday,
cur_tm.tm_hour,
cur_tm.tm_min,
cur_tm.tm_sec);
return buffertime;
}
这里用的localtime_r:
最后就是未来的一条日志:
// 内部类
// 表示的是未来的一条日志
class LogMessage
{
public:
// 左半部分
LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger)
: _cur_time(GetCurTime()),
_level(level),
_pid(getpid()),
_src_name(src_name),
_line_number(line_number),
_logger(logger)
{
std::stringstream ss;
ss << "[" << _cur_time << "] "
<< "[" << LevelToString(_level) << "] " // 因为是自定义的类,无法转换,这里要转化一下
<< "[" << _pid << "] "
<< "[" << _src_name << "] "
<< "[" << _line_number << "] "
<< "- ";
_loginfo = ss.str();
}
// 右半部分
template <class T>
LogMessage &operator<<(const T &info)
{
std::stringstream ss;
ss << info;
_loginfo += ss.str();
return *this;
}
~LogMessage()
{
// 内部类可以访问外部类的私有成员
if (_logger._fflush_strategy) // 刷新策略不为空
{
_logger._fflush_strategy->SyncLog(_loginfo);
}
}
private:
std::string _cur_time;
LogLevel _level;
pid_t _pid;
std::string _src_name;
int _line_number;
std::string _loginfo;
Logger &_logger;
};
// 故意写成返回临时对象
// 为了实现LogMessage << 1 << 23.12 << "hello"
// 一直到运算符结束才会调用析构
LogMessage operator()(LogLevel level, std::string name, int number)
{
return LogMessage(level, name, number, *this);
}
~Logger() {}
private:
std::unique_ptr<LogStrategy> _fflush_strategy;
};
这里return *this就是为了防止连续传入的情况,可以理解为a=b=c=d,这样的连续赋值操作。 在析构里去刷新。
在传参的时候会把左半部分传进来,之后返回内部类的临时对象。因为重载了<<。日志输出成功。
Logger log;
log(LogLevel::DEBUG,main.cc,10)<<"hello world";
但是这样写不太好,可以用宏替换:
// 全局日志对象
Logger logger;
// log(LogLevel::DEBUG,main.cc,10)<<"hello world";
// 用宏来简化用户操作,获取文件名和行号
#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strtegy() logger.EnableConsoleLogStrtegy()
#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
在main函数中就可以这样写:
Enable_Console_Log_Strtegy();
LOG(LogLevel::DEBUG) <<"hello world"<<3.14;
LOG(LogLevel::DEBUG) <<"hello world"<<3.14;
LOG(LogLevel::DEBUG) <<"hello world"<<3.14;
LOG(LogLevel::DEBUG) <<"hello world"<<3.14;
这样就成功输出:
3.2线程池设计
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
里面的一些任务什么的可以自己设置
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include<queue>
#include"Log.hpp"
#include"Thread.hpp"
#include"Cond.hpp"
namespace ThreadPoolModule
{
using namespace ThreadModlue;
using namespace LogModule;
using namespace CondModule;
static const int gnum = 5;
template<class T>
class ThreadPool
{
private:
void WakeUpAllThread()
{
MutexGuard mutexguard(&_mutex);
if(_sleepernum>0)
_cond.Broadcast();
LOG(LogLevel::INFO) << "唤醒所有的线程";
}
void WakeUpOneThread()
{
_cond.Signal();
LOG(LogLevel::INFO) << "唤醒一个线程";
}
public:
ThreadPool(int num = gnum):_num(num),_isruning(false),_sleepernum(0)
{
for(int i= 0;i<_num;i++)
{
_threads.emplace_back(
[this](){
HandlerTask();
}
);
}
}
void Start()
{
if(_isruning)
return;
_isruning = true;
for(auto &thread:_threads)
{
thread.Start();
LOG(LogLevel::INFO) << "start new thread success"<<thread.GetName();
}
}
void stop()
{
if(!_isruning) return;
_isruning=false;
//因为要确保所有的任务处理完毕
//唤醒所有的线程
WakeUpAllThread();
}
void Join()
{
for(auto &thread : _threads)
{
thread.Join();
}
}
void HandlerTask()
{
char name[128];
pthread_getname_np(pthread_self(),name,sizeof(name));
while(true)
{
T t;
{
MutexGuard mutexguard(&_mutex);
//一定要遵循 任务队列的任务被处理完且线程池退出
//当任务队列不为空,即使_isruning == false 还有继续运行任务
while(_taskq.empty() && _isruning)
{
_sleepernum++;
_cond.Wait(_mutex);
_sleepernum--;
}
//内部线程被唤醒
//只有线程池被退出了或者任务队列为空
if(!_isruning && _taskq.empty())
{
LOG(LogLevel::INFO) << name << "退出了 线程池退出&&任务队列为空";
break;
}
t = _taskq.front();
_taskq.pop();
}
t();
// sleep(1);
// LOG(LogLevel::DEBUG) << name <<" is ruing";
}
}
bool Equeue(const T& in)
{
//假如我已经关闭了线程池,外部就不能再去入任务了
if(_isruning)
{
MutexGuard mutexguard(&_mutex);
_taskq.push(in);
if(_threads.size() == _sleepernum)
{
WakeUpOneThread();
}
return true;
}
return false;
}
~ThreadPool()
{
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _taskq;
Cond _cond;
Mutex _mutex;
bool _isruning;
int _sleepernum;
};
}
3.3线程安全的单例模式
3.3.1单例模式
单例模式是一种设计模式,用于限制某个类仅能够创建一个实例,并提供一个全局访问点。在单例模式中,类的实例化过程只能发生一次,之后的每次调用都返回相同的实例。这样可以确保系统中某个类只有一个实例对象,节省系统资源并且方便对该实例对象的管理和控制。
3.3.2饿汉实现方式和懒汉实现方式
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
3.3.3饿汉模式实现单例模式
只要通过Singleton这个包装类来使用T对象,则一个进程中只有一个T对象的实例.
3.3.4懒汉模式实现单例模式
存在⼀个严重的问题,线程不安全。
第一次调用GetInstance的时候,如果两个线程同时调用,可能会创建出两份T对象的实例.但是后续再次调用,就没有问题了.
3.3.5懒汉方式实现单例模式的线程池代码
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
namespace ThreadPoolModule
{
using namespace ThreadModlue;
using namespace LogModule;
using namespace CondModule;
static const int gnum = 5;
template <class T>
class ThreadPool
{
private:
void WakeUpAllThread()
{
MutexGuard mutexguard(&_mutex);
if (_sleepernum > 0)
_cond.Broadcast();
LOG(LogLevel::INFO) << "唤醒所有的线程";
}
void WakeUpOneThread()
{
_cond.Signal();
LOG(LogLevel::INFO) << "唤醒一个线程";
}
ThreadPool(int num = gnum) : _num(num), _isruning(false), _sleepernum(0)
{
for (int i = 0; i < _num; i++)
{
_threads.emplace_back(
[this]()
{
HandlerTask();
});
}
}
void Start()
{
if (_isruning)
return;
_isruning = true;
for (auto &thread : _threads)
{
thread.Start();
LOG(LogLevel::INFO) << "start new thread success" << thread.GetName();
}
}
// 禁用拷贝构造和赋值
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
public:
//如果没有static,本来就属于类内的成员方法,那么在调用的时候就必须要创建对象(别忘了这个函数的作用)
//所以如果要实现懒汉方式下的单例模式就必须加上static
static ThreadPool<T> *GetInstance() // 确保只有一次创建对象
{
//要加锁就必须有对象(因为锁在成员变量里),为了防止,用static
if(inc==nullptr)//双重if判定,避免不必要的锁竞争,不满足就直接返回
{
MutexGuard mutexguard(_lock);
LOG(LogLevel::INFO)<<"获取单例";
if (inc == nullptr)
{
LOG(LogLevel::INFO)<<"首次使用单例,创建";
inc = new ThreadPool<T>();
inc->Start();
}
}
return inc;
}
void stop()
{
if (!_isruning)
return;
_isruning = false;
// 因为要确保所有的任务处理完毕
// 唤醒所有的线程
WakeUpAllThread();
}
void Join()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
void HandlerTask()
{
char name[128];
pthread_getname_np(pthread_self(), name, sizeof(name));
while (true)
{
T t;
{
MutexGuard mutexguard(&_mutex);
// 一定要遵循 任务队列的任务被处理完且线程池退出
// 当任务队列不为空,即使_isruning == false 还有继续运行任务
while (_taskq.empty() && _isruning)
{
_sleepernum++;
_cond.Wait(_mutex);
_sleepernum--;
}
// 内部线程被唤醒
// 只有线程池被退出了或者任务队列为空
if (!_isruning && _taskq.empty())
{
LOG(LogLevel::INFO) << name << "退出了 线程池退出&&任务队列为空";
break;
}
t = _taskq.front();
_taskq.pop();
}
t();
// sleep(1);
// LOG(LogLevel::DEBUG) << name <<" is ruing";
}
}
bool Equeue(const T &in)
{
// 假如我已经关闭了线程池,外部就不能再去入任务了
if (_isruning)
{
MutexGuard mutexguard(&_mutex);
_taskq.push(in);
if (_threads.size() == _sleepernum)
{
WakeUpOneThread();
}
return true;
}
return false;
}
~ThreadPool()
{
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _taskq;
Cond _cond;
Mutex _mutex;
bool _isruning;
int _sleepernum;
static ThreadPool<T> *inc;
static Mutex _lock;//必须要加static
};
//静态成员变量在类外初始化
template<class T>
ThreadPool<T>* ThreadPool<T>::inc = nullptr;
template<class T>
Mutex ThreadPool<T>::_lock;
}
1.禁止构造和赋值
2.静态成员变量inc,静态成员函数GetInstance。目的是不让重复的创建对象,根据静态成员函数和静态成员函数的特性,静态函数只能访问静态的(没有this指针),而且调用时不用创建对象。
3.双重if,是因为可能会有多个线程调用这个静态成员函数去创建,这里依然会有线程安全的问题(具体为什么看1.2的负数问题),所以就要多加一把不用创建对象的锁(static)。而这会有新的问题,多个线程会同时竞争这把锁,则我们多加一层判断来阻断那么多线程对锁的进程,直接返回第一个线程创建对象就行了。