目录
1. Linux线程概念
线程:是进程内的一个执行分支。线程的执行拉度)要比进程要细。
线程分为主线程和副线程,线程共享绝大部分的进程地址空间内容。
Linux中的线程也被叫做轻量化进程。
如何理解进程和线程:
线程是操作系统调度的基本单位!
进程是承担分配系统资源的基本实体,进程内包括了线程
为什么线程比进程要更轻量化?:
- 创建和释放更加轻量化
- 切换更加轻量化
线程共享进程数据,但也拥有自己的一部分数据:
- 一组寄存器
- 栈
- 信号屏蔽字
各线程共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
2. 重谈进程地址空间—页表
2.1 如何由虚拟地址转化为物理地址的
下面以32位虚拟地址(虚拟地址为4字节)为例:
虚拟地址为4字节,32个比特位,分为:10+10+12
3. pthread库调用接口
内核中有没有很明确的线程的概念呢?没有的。线程就是轻量级进程的概念。并不会给我直接给我们提供轻量级进程的系统调用!
所以在pthread线程库将轻量级进程接口进行了封装----pthread为用户提供直接线程的接口,pthread是在应用层。Linux中编写多线程代码需要使用第三方pthread库!所以编译时必须加:-lpthread
3.1 线程的创建—pthread_create
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(*start_routine)(void*), void *arg);
thread:输出形参数,线程id
attr:线程的属性。可默认,输入nullptr
start_routine:线程的执行函数
arg:输入形参数,arg为线程的执行函数中的参数
返回值:
成功为0,失败放回错误码。
3.2 线程等待—pthread_join
int pthread_join(pthread_t thread, void **value_ptr);
- thread:线程ID
- value_ptr:它指向一个指针,后者指向线程执行函数(即start_routine对应的函数)的返回值。
如果start_routine对应的函数的返回值为退出码或退出数据,则pthread_join就可以得到退出码或退出数据。 - 返回值:成功返回0;失败返回错误码
注意事项:
- 主线程创建了副线程,就承担了副线程的管理工作,意味着要在副线程退出之后,调用 pthread_join等待函数,才能退出,如果先推,则会造成内存泄露。
- 主线程的线程id和进程的id相同,副线程则不同。
3.3 线程的退出
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。exit:结束进程
- 线程可以自己调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
void pthread_exit(void *value_ptr);
无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
value_ptr 是线程退出时传递的返回值指针,可自己设置
int pthread_cancel(pthread_t thread); //thread:线程ID
当线程被别的线程调用pthread_ cancel终止时,返回值为(void*)1
返回值:成功返回0;失败返回错误码
3.4 分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
- 该函数可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离
- joinable和分离是冲突的,一个线程不能既是joinable又是分离的。
int pthread_detach(pthread_t thread);
4. 线程库
- 一个线程控制块对应一个线程,线程控制块里有指针指向回调函数和独立栈。
- 线程库给我们维护线程概念,通过先描述后组织。
- 执行后,线程库会被加载到内存中。
5. 线程ID
每一个线程的库级别的tcb的起始地址,叫做线程的tid!
除了主线程,所有其他线程的独立栈,都在共享区。具体来讲是在pthread库中,tid指向的用户tcb中!
pthread_ create函数会产生一个线程ID,就是图中的tid,存放在第一个参数指向的地址中。
pthread_t 到底是什么类型呢?取决于实现。对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。
线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID:
pthread_t pthread_self(void);
6. Linux线程互斥
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
6.1 锁
当多个线程访问全局变量ticket,并进行ticket–; 时,会出现下面的情况:
怎么解决?
对共享数据的任何访问,保证任何时候只有一个执行流访问!——互斥!!——用锁
6.2 锁的接口
6.2.1 互斥量的初始化
互斥量=锁
方法1,静态分配(互斥锁是全局变量或静态变量):
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
方法2,动态分配
pthread_mutex_t mutex;
int pthread_mutex_init(pthread_mutex_t *restrict_mutex,
const pthread_mutexattr_t *restrict_attr);
参数:
mutex:要初始化的互斥量
attr:NULL
6.2.2 销毁互斥量
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
6.2.3 互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
6.3 锁的原理
lock:
movb $0, %al ; AL = 0(尝试获取锁)
xchgb %al, mutex ; 原子交换 AL 和 mutex 的值
if (%al > 0) { ; 如果 AL > 0(获取锁成功)
return 0; ; 进入临界区
} else {
挂起等待; ; 锁被占用,自旋等待
}
goto lock; ; 继续尝试获取锁
unlock:
movb $1, mutex ; 释放锁(mutex = 1)
唤醒等待Mutex的线程; ; 通知其他线程
return 0;
关键点
xchgb %al, mutex
- 这是一个 原子交换 操作,保证
mutex
的修改不会被其他线程干扰。 - 如果
mutex
初始为1
(可用),交换后:AL = 1
(成功获取锁)mutex = 0
(锁被占用)
- 如果
mutex
为0
(被占用),交换后:AL = 0
(获取锁失败)mutex
仍然是0
(锁仍被占用)
if (AL > 0)
- 如果
AL == 1
,说明锁获取成功,可以进入临界区。 - 如果
AL == 0
,说明锁被占用,需要 自旋等待(忙等待或让出 CPU)。
unlock
部分
movb $1, mutex
直接释放锁(mutex = 1
)。
交换的本质:
把内存中的数据,交换到CPU的寄存器中,成为某一线程的上下文
6.4 条件变量
条件变量的作用:使满足条件的队列沉睡,等待条件满足后唤醒。使线程对于资源的竞争有序,防止饥饿情况。
条件变量函数 初始化:
int pthread_cond_init(pthread_cond_t *restrict_cond,
const pthread_condattr_t *restrict_attr);
参数:
restrict_cond:要初始化的条件变量
restrict_attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足:
int pthread_cond_wait(pthread_cond_t *restrict_cond,
pthread_mutex_t *restrict_mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
唤醒等待:
int pthread_cond_broadcast(pthread_cond_t *cond); 将队列所在的线程全部唤醒
int pthread_cond_signal(pthread_cond_t *cond); 只唤醒队首线程
7. POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
信号量的本质是一把记数器,且信号量的增减操作都是原子的。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem:信号量指针
pshared:0 表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:申请资源,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
如果信号量的值 > 0,则直接减 1,并继续执行。
如果信号量的值 ≤ 0,则当前线程/进程会被阻塞,直到信号量变为正数(由其他线程调用sem_post() 释放资源)。
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
8. 生产者消费者模型
为何要使用生产者消费者模型:
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点:
1.解耦
2.支持并发
3.支持忙闲不均
8.1 基于阻塞队列(BlockQueue)的生产消费模型(含实现代码)
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型:
BlockQueue.hpp:
#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>
using namespace std;
template <class T>
class BlockQueue
{
static const int defaultnum=5;
public:
BlockQueue(int maxcap=defaultnum):maxcap_(maxcap)
{
maxcap_=maxcap;
pthread_mutex_init(&mutex_,NULL);
pthread_cond_init(&p_cond_,NULL);
pthread_cond_init(&c_cond_,NULL);
}
void push(const T &in) //生产者
{
pthread_mutex_lock(&mutex_);
while(q_.size()==maxcap_) //满了,不要生产
{
pthread_cond_wait(&p_cond_,&mutex_);
}
q_.push(in);
//if(q_.size()<low_water_) pthread_cond_signal(&c_cond_);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
}
T pop() //消费者
{
pthread_mutex_lock(&mutex_);
while(q_.empty()) //空了,不要去消费,在这排队等
{
pthread_cond_wait(&c_cond_,&mutex_);
}
T out=q_.front();
q_.pop();
//if(q_.size()>high_water_) pthread_cond_signal(&p_cond_);
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mutex_);
return out;
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&p_cond_);
pthread_cond_destroy(&c_cond_);
}
private:
queue<T> q_; //仓库队列
int maxcap_; //仓库容量/阻塞队列大小
pthread_mutex_t mutex_; //整个阻塞队列中只能有一个生产者或消费者,生产者和消费者也不能同时在队列中,因为无法判断生产者消费者何时访问队列中的同一个T
pthread_cond_t p_cond_;
pthread_cond_t c_cond_;
// int low_water_;
// int high_water_;
};
Task.hpp:
#include<string>
#include<iostream>
string opers="+-*/%";
enum{
DivZero=1,
ModZero,
UnKnown
};
using namespace std;
class Task
{
public:
Task(int data1,int data2,char oper)
{
data1_=data1;
data2_=data2;
oper_=oper;
}
void run()
{
switch(oper_)
{
case '+':
result_=data1_+data2_;
break;
case '-':
result_=data1_-data2_;
break;
case '*':
result_=data1_*data2_;
break;
case '/':
if(data2_==0) exitcode_=DivZero;
else result_=data1_/data2_;
break;
case '%':
if(data2_==0) exitcode_=ModZero;
else result_=data1_%data2_;
break;
default:
exitcode_=UnKnown;
}
}
void operator()()
{
run();
}
string GetTask()
{
string s;
s+=to_string(data1_);
s+=oper_;
s+=to_string(data2_);
s+="=?";
return s;
}
string GetResult()
{
string s;
s+=to_string(data1_);
s+=oper_;
s+=to_string(data2_);
s+="=";
s+=to_string(result_);
s+=", exitcode_ = ";
s+=to_string(exitcode_);
return s;
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
main.cc:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
void *Consumer(void *args)
{
BlockQueue<Task> *q = static_cast< BlockQueue<Task> *>(args);
while (true)
{
Task t = q->pop();
t();
//usleep(1);
cout<<"处理了一个任务:"<<t.GetResult()<<", thread id :"<<pthread_self()<<endl;
//sleep(1);
}
return nullptr;
}
void *Productor(void *args)
{
BlockQueue<Task> *q = static_cast< BlockQueue<Task> *>(args);
while (true)
{
int data1 = rand() % 10 + 1;
int data2 = rand() % 10;
char oper = opers[rand() % opers.size()];
usleep(1);
Task t(data1, data2, oper);
q->push(t);
cout<<"发送了一个任务 : "<<t.GetTask()<<", thread id :"<<pthread_self()<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
BlockQueue<Task> q;
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, Consumer, &q);
}
for (int i = 0; i < 5; i++)
{
pthread_create(p + i, nullptr, Productor, &q);
}
for (auto x : c)
{
pthread_join(x,nullptr);
}
for (auto x : p)
{
pthread_join(x,nullptr);
}
return 0;
}
8.2 基于环形队列(RingQueue)的生产消费模型(含实现代码)
BlockQueue:
锁 :互斥
条件变量:同步
基于阻塞队列的生产消费模型,一段时间内,只允许一个角色在队列中,即要么生产者,要么消费者,这是通过生成者和消费者使用同一个锁实现的。为什么要这样呢?这是为了防止同时访问同一份资源
RingQueue:
POSIX信号量的PV操作:是原子的
POSIX信号量的PV操作:同步
锁:互斥
在基于环形队列(RingQueue)的生产消费模型中,当生产者和消费者访问同一个空间时,要么是队列为空,要么队列为满。
当为空时,消费者的信号量为空,被阻塞,只有生产者可以访问该空间。
当为满时,生产者的信号量为空,被阻塞,只有消费者者可以访问该空间。
所以不用担心同一个空间被生产者和消费者同时访问。所以生产者和消费者可以同时在队列中进行生产和消费。所以生产者和消费者可以分别使用一把锁。
- 环形队列采用数组模拟,用模运算来模拟环状特性。
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以一般可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态(如下图)。但是在我们这个生产消费模型中,不需要这样做,可以直接让环形结构起始状态和结束状态都是一样的,同时也需要判断空和满,因为会有POSIX信号量来维持同步关系。
代码gittee链接:
基于环形队列(RingQueue)的生产消费模型
RingQueue.hpp:
#include<iostream>
#include<vector>
#include<semaphore.h>
#include"Task.hpp"
#include<pthread.h>
using namespace std;
const int DefaultNum=10;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem) //-
{
sem_wait(&sem);
}
void V(sem_t& sem) //+
{
sem_post(&sem);
}
public:
RingQueue(int cap=DefaultNum)
{
cap_=cap;
q.resize(cap_);
pthread_mutex_init(&c_mutex,nullptr);
pthread_mutex_init(&p_mutex,nullptr);
sem_init(&cdata_sem_,0,0);
sem_init(&pspace_sem_,0, cap_);
}
void Push(const T& in) //生产者
{
P(pspace_sem_); //减,只能先减,不能先加
pthread_mutex_lock(&p_mutex);
q[p_step_]=in;
p_step_=(p_step_+1)%cap_;
pthread_mutex_unlock(&p_mutex);
V(cdata_sem_); //加
}
void Pop(T& out) //消费者
{
P(cdata_sem_);
pthread_mutex_lock(&c_mutex);
out=q[c_step_];
c_step_=(c_step_+1)%cap_;
pthread_mutex_unlock(&c_mutex);
V(pspace_sem_);
}
~RingQueue()
{
pthread_mutex_destroy(&c_mutex);
pthread_mutex_destroy(&p_mutex);
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
}
private:
vector<T> q;
int cap_;
pthread_mutex_t c_mutex;
pthread_mutex_t p_mutex;
sem_t cdata_sem_;
sem_t pspace_sem_;
int c_step_=0;
int p_step_=0;
};
Task.hpp:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult() //r: a+b=c[code:1]
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask() //r: a+b=?
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
main.cc:
#include<iostream>
#include"RingQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<time.h>
#include<unistd.h>
#include<string>
using namespace std;
struct ThreadData
{
RingQueue<Task> *rq;
string ThreadName;
};
//string opers="+-*/%";
void *Producer(void* args)
{
ThreadData* td=static_cast<ThreadData*>(args);
RingQueue<Task> *rq=td->rq;
string ThreadName=td->ThreadName;
while(true)
{
int data1=rand()%100;
int data2=rand()%100;
char oper=opers[rand()%opers.size()];
Task t(data1,data2,oper);
rq->Push(t);
cout<<ThreadName<<" send a task: "<<t.GetTask()<<endl;
sleep(1);
}
delete td;
return nullptr;
}
void *Consumer(void* args)
{
ThreadData* td=static_cast<ThreadData*>(args);
RingQueue<Task> *rq=td->rq;
string ThreadName=td->ThreadName;
while(true)
{
Task t;
rq->Pop(t);
t.run();
cout<<ThreadName<<" handle a task: "<<t.GetResult()<<endl;
}
delete td;
return nullptr;
}
int main()
{
srand(time(nullptr)^getpid());
RingQueue<Task> *rq=new RingQueue<Task>(50);
pthread_t p[5],c[3];
for(int i=0;i<5;i++)
{
ThreadData* td=new ThreadData;
td->ThreadName="Producer_"+to_string(i);
td->rq=rq;
pthread_create(p+i,nullptr,Producer,(void*)td);
}
for(int i=0;i<3;i++)
{
ThreadData* td=new ThreadData;
td->ThreadName="Consumer_"+to_string(i);
td->rq=rq;
pthread_create(c+i,nullptr,Consumer,(void*)td);
}
for(auto x:p)
{
pthread_join(x,nullptr);
}
for(auto x:c)
{
pthread_join(x,nullptr);
}
delete rq;
return 0;
}
9. 自己封装线程、std::thread的基本使用
9.1 线程的封装
gittee链接如下:线程的封装
Thread.hpp:
#include<iostream>
#include<string>
#include<unistd.h>
#include <pthread.h>
#include<vector>
using namespace std;
static int num=1;
typedef void(*callback_t)(); //void func() 形式的函数的函数指针,callback_t包含在线程的执行函数里,是回调函数
//对线程进行封装
class Thread
{
public:
static void* Routine(void* args) //args:cb_ 为什么要加static,不加的话,Routin参数里就会有一个 this 指针
{
Thread* self=static_cast<Thread*>(args);
self->cb_();
return nullptr;
}
void Join()
{
if(isrunnig_)
{
pthread_join(id_,nullptr);
isrunnig_=false;
}
}
public:
Thread(callback_t cb):ThreadName_(""), id_(0), isrunnig_(false),cb_(cb)
{}
void Run()
{
//开始创建线程
isrunnig_=true;
ThreadName_="Thread"+to_string(num++);
start_timestamp_=time(nullptr); //time(nullptr): 获取当前系统时间的时间戳
pthread_create(&id_,nullptr,Routine,(void*)this); //这里的this指针不能改成cb_,因为函数指针的转换存在风险
}
~Thread()
{
Join();
}
private:
string ThreadName_;
pthread_t id_;
bool isrunnig_; //可表示为线程是否被创建过
uint64_t start_timestamp_; //可以计算出线程运行时间
callback_t cb_;
};
Main.cc:
#include"Thread.hpp"
using namespace std;
void Print()
{
cout<<"我是一个封装的线程"<<endl;
}
int main()
{
vector<Thread> threads;
for(int i=0;i<5;i++)
{
threads.push_back(Thread(Print));
}
for(auto& t:threads)
{
t.Run();
}
for(auto& t:threads)
{
t.Join();
}
return 0;
}
9.2 std::thread的基本使用
C++ std::thread 简单用法示例:
#include <iostream>
#include <thread>
#include <vector>
// 1. 基本线程函数
void hello() {
std::cout << "Hello from thread!\n";
}
// 2. 带参数的线程函数
void print_numbers(int start, int end) {
for (int i = start; i <= end; ++i) {
std::cout << i << " ";
}
std::cout << "\n";
}
int main() {
// 1. 创建单个线程
std::thread t1(hello);
t1.join(); // 等待线程结束
// 2. 带参数的线程
std::thread t2(print_numbers, 1, 5);
t2.join();
// 3. 使用lambda表达式创建线程
std::thread t3([]() {
std::cout << "Lambda thread running\n";
});
t3.join();
// 4. 创建多个线程
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back([i]() {
std::cout << "Thread " << i << " running\n";
});
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "All threads finished\n";
return 0;
}
10. 线程池
gittee链接: 线程池
ThreadPool.hpp :
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include<cassert>
const int defaultnum=5;
class ThreadInfo
{
public:
std::string threadname;
pthread_t tid;
};
template <class T>
class ThreadPool
{
private:
//有关锁和条件变量的函数的封装
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void UnLock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_); //唤醒的两个条件:有锁,有人唤醒我(有人调用pthread_cond_signal函数)
}
std::string GetThreadName(pthread_t tid)
{
for(const auto& ti:threads_)
{
if(ti.tid==tid)
{
return ti.threadname;
}
}
return "None";
}
bool IsTaskEmpty()
{
return tasks_.empty();
}
public:
static void* HandlerTask(void* args)
{
ThreadPool<T>* tp=static_cast<ThreadPool<T>*>(args);
pthread_t tid=pthread_self();
std::string threadname=tp->GetThreadName(tid);
pthread_detach(tid);
//开始执行任务
while(true)
{
tp->Lock();
while (tp->IsTaskEmpty())
{
tp->ThreadSleep(); // 默认为阻塞,while防止虚假唤醒 //唤醒的两个条件:有锁,有push任务
}
T t = tp->PopTask();
tp->UnLock();
t();
std::cout << threadname << " handler a task: " << t.GetResult() << std::endl;
}
return nullptr;
}
void PushTask(const T& t)
{
Lock();
tasks_.push(t);
Wakeup(); //有任务了,唤醒线程执行任务
UnLock();
}
T PopTask() //PopTask不需要加锁,因为在HandlerTask内,PopTask在加锁区域内,并且也不能加锁,加了会导致死锁
{
assert(pthread_mutex_trylock(&mutex_) != 0);
T t=tasks_.front();
tasks_.pop();
return t;
}
static ThreadPool<T>* GetInstance() //目的:得到 ThreadPool对象,new一个
{
if(nullptr==tp_)
{
pthread_mutex_lock(&lock_); //为什么加锁,防止多个线程执行单例模式函数
std::cout << "log: singleton create done first!" << std::endl; //单例模式(singleton)的创建已完成
tp_=new ThreadPool<T>();
pthread_mutex_unlock(&lock_);
}
return tp_;
}
void Start()
{
int nums=threads_.size();
for(int i=0;i<nums;i++)
{
//讲线程的信息也放入了threads数组中
threads_[i].threadname=std::to_string(i)+"_Thread";
pthread_create(&threads_[i].tid,nullptr,HandlerTask,this);
}
}
void Destroy()
{
if(tp_)
{
delete tp_; //调用析构函数,不可以直接~ThreadPool();
tp_ = nullptr;
}
}
private:
ThreadPool(int num = defaultnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool() //一般含单例模式的类的析构函数都是private,因为防止外部销毁
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete; //拷贝构造
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b,拷贝赋值
private:
std::vector<ThreadInfo> threads_; //放线程
std::queue<T> tasks_; //放任务
static ThreadPool<T>* tp_; //配合单例模式,且必须要用static
static pthread_mutex_t lock_; //只搭配单例模式的锁,使用两把锁,使代码清晰
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
template <class T>
ThreadPool<T>* ThreadPool<T>::tp_=nullptr; //tp_的初始化,必须加ThreadPool<T>::
//为什么必须要设置为static,而不能是非static,并在Start中初始化,因为ThreadPool<Task>::GetInstance()->Start(); GetInstance在Start前面
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER; //lock_的初始化
Main.cc:
#include <iostream>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"
int main()
{
std::cout<<"Process running..."<<std::endl;
ThreadPool<Task>::GetInstance()->Start(); //线程已准备,等待着任务
srand(getpid()^time(nullptr));
while(true)
{
//创建任务
int a=rand()%10;
int b=rand()%10;
int len=opers.size();
char oper=opers[rand()%len];
usleep(10);
Task t(a,b,oper);
std::cout<<"main thread make task: "<<t.GetTask()<<std::endl;
//放入线程池中,线程池会自动处理任务,因为线程已开启,并且一直在循环,不会退出
ThreadPool<Task>::GetInstance()->PushTask(t);
sleep(1);
}
ThreadPool<Task>::GetInstance()->Destroy();
return 0;
}
Makefile文件和Task.hpp文件:略,可在gittee链接 (上文有) 中查看