线程池的概念
线程池(threadpool
)是一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
线程池的优点
- 线程池避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核充分利用,还能防止过分调度。
注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
线程池常见的应用场景如下:
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
相关解释:
- 像Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
- 对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。
线程池的实现
实现图解
下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)。
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。
ThreadPool.hpp
#pragma once
#include<iostream>
#include<vector>
#include<string>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;
class ThreadInfo
{
public:
pthread_t tid_;
string name_;
};
static const int defaultnum = 5;//这里默认设置了5个
template<class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
string GetThreadName(pthread_t tid)
{
for(const auto &ti : threads_)
{
//std::cout << tid << ":" << ti.tid_ << std::endl;
if(ti.tid_ == tid)
{
return ti.name_;
}
}
return "None!!!"; // 这里
}
public:
static ThreadPool<T> *GetInstance()
{
// 这里使用了两个 if判断语句的技术称为双检锁;好处是,只有判断指针为空的时候才加锁,
// 避免每次调用 GetInstance的方法都加锁,锁的开销毕竟还是有点大的。
if(nullptr == tp_)
{
pthread_mutex_lock(&lock_);
if(nullptr == tp_)
{
cout << "log: singleton has creadted done first!!!" << endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
void Start()
{
int num = threads_.size();
for(int i = 0; i < num; i++)
{
threads_[i].name_ = "thread-" + to_string(i+1);
pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
cout << threads_[i].name_ << " has created" << ", tid is " << threads_[i].tid_ << endl;//我加的测试,数据tid能打印,所以存进去了
}
}
static void *HandlerTask(void *arg)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(arg);
string name = tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();//执行run(),这里对()进行了运算符重载
cout << name << " run, " << "result: " << t.GetResult() << endl;//这里打印出来,显示 对应的thread!!!
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
private:
ThreadPool(int num = defaultnum):threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> operator=(const ThreadPool<T> &) = delete;
private:
vector<ThreadInfo> threads_;
queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;//创建的时候用
};
template<class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
1.用加锁的懒汉式单例创建线程池
单例模式指在整个系统生命周期里,保证一个类只能产生一个实例,确保该类的唯一性。我们在实际应用中不可能在服务器上创建多个线程池,往往创建一个线程池即可,否则会占用大量资源。
而加锁的懒汉式单例是线程安全的,加锁后能保证线程池的构造函数只会调用一次,创建一个线程池。为了确保实例的唯一性,我们会做如下操作:
- 会将构造函数、析构函数、拷贝构造和赋值运算符重载都变为私有
- 在类里留出获取实例的静态函数,可以全局访问
- 留出
static ThreadPool<T> *GetInstance()
的接口进行构造和获取线程池的全局指针,构造的时候会加锁
在 static ThreadPool *GetInstance() 的内部会使用 双检锁。好处是,在判断指针为空的时候才加锁,毕竟加锁也是一笔资源开销。
详细的关于线程安全的单例模式内容这里不多赘述,可看如下文章:
2.为什么线程池中需要有互斥锁和条件变量?
线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。
注意:
- 当某线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
pthread_cond_broadcast
函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal
函数唤醒正在等待的一个线程即可。- 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区当中。
- 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
3.为什么线程池中的线程执行例程需要设置为静态方法?
使用 pthread_create
函数创建线程时,需要为创建的线程传入一个 Routine
(执行例程),该Routine只有一个参数类型为 void*
的参数,以及返回类型为 void*
的返回值。
而此时 Routine
作为类的成员函数,该函数的第一个参数是隐藏的 this
指针,因此这里的 Routine 函数,虽然看起来只有一个参数,而实际上它有两个参数,此时直接将该Routine函数作为创建线程时的执行例程是不行的,无法通过编译。
静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此我们需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void*的参数。
但是在静态成员函数内部无法调用非静态成员函数,而我们需要在Routine函数当中调用该类的某些非静态成员函数,比如Pop。因此我们需要在创建线程时,向Routine函数传入的当前对象的this指针,此时我们就能够通过该this指针在Routine函数内部调用非静态成员函数了。
Task.hpp
我们将线程池进行了模板化,因此线程池当中存储的任务类型可以是任意的,但无论该任务是什么类型的,在该任务类当中都必须包含一个Run方法,当我们处理该类型的任务时只需调用该Run方法即可。
例如,下面我们实现一个计算任务类:
#pragma once
#include <iostream>
#include <string>
using namespace std;
string opers = "+-*/&";
enum
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
~Task()
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
break;
}
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
break;
}
default:
exitcode_ = Unknown;
break;
}
}
void operator()()
{
run();
}
string GetResult()
{
string r = to_string(data1_);
r += oper_;
r = to_string(data2_);
r += "=";
r = to_string(result_);
r += "[exitcode=";
r += to_string(exitcode_);
r += "]";
return r;
}
string GetTask()
{
string r = to_string(data1_);
r += oper_;
r += to_string(data2_);
r += "=?";
return r;
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
此时线程池内的线程不断从任务队列拿出任务进行处理,而它们并不需要关心这些任务是哪来的,它们只需要拿到任务后执行对应的Run方法即可。
Main.cc
主线程就负责不断向任务队列当中Push任务就行了,此后线程池当中的线程会从任务队列当中获取到这些任务并进行处理。
#include<iostream>
#include<ctime>
#include"Task.hpp"
#include"ThreadPool.hpp"
using namespace std;
int main()
{
cout << "process run..." << endl;
sleep(3);
ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance();
tp->Start();
srand(time(nullptr)^getpid());
while(true)
{
// 1.创建任务
int x = rand() % 10 + 1;
usleep(10);
int y = rand() % 5;
char op = opers[rand() % opers.size()];
Task t(x, y, op);
tp->Push(t);
// 2.处理任务
cout << "main thread has made task: " << t.GetTask() << endl;
sleep(1);
}
return 0;
}
运行结果
运行代码后就有5个线程,他们是线程池内处理任务的线程。
并且我们会发现这五个线程在处理时会呈现出一定的顺序性,因为主线程是每次Push一个任务,这五个线程只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待。当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性。
注意: 此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。