线程池的实现和网络应用
文章目录
在阅读本文的同时,思考以下问题
- 为什么使用线程池
- 线程池的构成
- 消息队列如何设计(结合线程池网络编程的实例)。
1、线程池的概念
1.1、为什么要使用线程池
对于某些特别耗时的任务,严重影响了该线程处理其他任务,需要将这些任务交给其他线程异步处理,但是频繁创建和销毁线程也会给系统带来大量开销。线程池的出现可以很好解决上述问题,线程池中的线程数量需要我们在线程资源的开销和 cpu 核心间进行平衡选择,要做到既能减少线程创建和销毁的开销,又能最大限度的提高 cpu 利用率。
1.2、线程池的作用
- 复用线程资源,减少线程创建和销毁的开销
- 可异步处理生产者线程的任务。
1.3、线程池的构成
线程池其实是一个生产者和消费者模型,生产者线程发布任务,将任务放入队列中。线程池中的空闲消费者线程从队列中取出任务,并执行任务。
如图,线程池有三个核心组件
- 生产者线程:发布任务
- 队列:存储任务结点,其中包括任务的上下文、任务的执行函数等
- 线程池(消费者):取出任务,执行任务。此过程涉及到线程调度的问题,即从无到有唤醒,从有到无休眠。
1.4、线程池线程数量的平衡选择
线程池线程数量的经验公式:(io等待时间 + cpu运算时间)* 核心数 / cpu运算时间
- io 密集型:2 * cpu 核心数 + 有效磁盘数(2)
- cpu 密集型:与 cpu 核心数一致
2、线程池的实现
2.1、线程池的组件
生产者线程:主线程
任务队列
// 任务结点 typedef struct task_s { handler_pt func; // 任务的执行函数 void * arg; // 任务的上下文 } task_t; // 任务队列 typedef struct task_queue_s { uint32_t head; // 队列的头指针 uint32_t tail; // 队列的尾指针 uint32_t count; // 队列中的任务结点数量 task_t *queue; // 队列数组 } task_queue_t;
线程池
struct thread_pool_t { pthread_mutex_t mutex; pthread_cond_t condition; pthread_t *threads; task_queue_t task_queue; int closed; // 退出标记 int started; // 当前运行的线程数 int thrd_count; // 线程的数量 int queue_size; // 队列的长度,设置数组,一次性分配内存,不使用内存池 };
2.2、线程池的接口
2.2.1、创建线程池
- 线程池的线程数量:根据经验公式计算得出
- 队列的长度:线程的栈空间固定大小(默认8M),由线程数量和线程栈空间大小共同决定
thread_pool_t* thread_pool_create(int thrd_count, int queue_size) {
thread_pool_t *pool;
if (thrd_count <= 0 || queue_size <= 0) {
return NULL;
}
// 线程池分配内存
pool = (thread_pool_t*) malloc(sizeof(*pool));
if (pool == NULL) {
return NULL;
}
// 初始化线程池
// 为什么不直接用thrd_count赋值?而是选择从0开始计数
// 每当成功创建1个线程,计数+1,避免线程创建失败造成计数混乱
pool->thrd_count = 0;
pool->queue_size = queue_size;
pool->task_queue.head = 0;
pool->task_queue.tail = 0;
pool->task_queue.count = 0;
pool->started = pool->closed = 0;
// 创建任务队列
pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
if (pool->task_queue.queue == NULL) {
// TODO: free pool
return NULL;
}
// 创建线程
pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
if (pool->threads == NULL) {
// TODO: free pool
return NULL;
}
// 依次创建好线程
for (int i = 0; i < thrd_count; ++i) {
if (pthread_create(&(pool->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
// TODO: free pool
return NULL;
}
pool->thrd_count++;
pool->started++;
}
return pool;
}
// 线程池中的线程(消费者)该干的事儿
static void* thread_worker(void *thrd_pool) {
thread_pool_t *pool = (thread_pool_t*)thrd_pool;
task_queue_t *que;
task_t task;
for (;;) {
pthread_mutex_lock(&(pool->mutex));
que = &pool->task_queue;
// 虚假唤醒问题
// while 判断:没有任务而且线程池没有关闭
while (que->count == 0 && pool->closed == 0) {
// pthread_mutex_unlock(&(pool->mutex))
// 阻塞在 condition
// ===================================
// 解除阻塞
// pthread_mutex_lock(&(pool->mutex));
pthread_cond_wait(&(pool->condition), &(pool->mutex));
}
// 线程池关闭
if (pool->closed == 1) break;
// 获取任务
task = que->queue[que->head];
que->head = (que->head + 1) % pool->queue_size;
que->count--;
pthread_mutex_unlock(&(pool->mutex));
// 执行任务
(*(task.func))(task.arg);
}
// 销毁该线程
pool->started--;
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
return NULL;
}
虚假唤醒
上述代码注释中提到了虚假唤醒,那么,什么是虚假唤醒?
虚假唤醒指的是在多线程环境下,多个线程等待在同一个条件上。当条件满足时,可能唤醒多个线程。如果这个资源只能由一个线程获得,剩余线程无法获得该资源。对于无法获得资源的线程来说,这种唤醒是无意义的,这种现象称为虚假唤醒。
虚假唤醒产生的原因
- 信号中断导致的问题(Linux2.6以后已解决)
pthread_cond_signal
,至少唤醒1个线程,即可能同时唤醒多个线程。
为避免虚假唤醒的发生,每个被唤醒的线程都需要再检查一次条件是否满足。如果不满足,应该继续睡眠;只有满足了才能往下执行。因此,需要把条件变量的判断从 if 判断换成 while 判断,此时,一次只能唤醒一个线程。
2.2.2、销毁线程池
- 标记线程池退出
- 通知所有线程
int thread_pool_destroy(thread_pool_t *pool) {
if (pool == NULL) {
return -1;
}
// 阻止产生新的任务
if (pthread_mutex_lock(&(pool->mutex)) != 0) {
return -2;
}
// 判断是否已经退出,防止重复释放空间
if (pool->closed) {
thread_pool_free(pool);
return -3;
}
// 标记线程池退出
pool->closed = 1;
// 唤醒所有阻塞在cond上的线程,并释放互斥锁
if (pthread_cond_broadcast(&(pool->condition)) != 0 ||
pthread_mutex_unlock(&(pool->mutex)) != 0) {
thread_pool_free(pool);
return -4;
}
// 等待所有线程退出
wait_all_done(pool);
// 释放线程池空间
thread_pool_free(pool);
return 0;
}
2.2.3、生产者抛出任务
- 构造任务
- 放入队列
- 唤醒线程
int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
if (pool == NULL || func == NULL) {
return -1;
}
task_queue_t *task_queue = &(pool->task_queue);
if (pthread_mutex_lock(&(pool->mutex)) != 0) {
return -2;
}
// 判断线程池是否关闭
if (pool->closed) {
pthread_mutex_unlock(&(pool->mutex));
return -3;
}
// 判断任务队列是否已满
if (task_queue->count == pool->queue_size) {
pthread_mutex_unlock(&(pool->mutex));
return -4;
}
// 1、主线程(生产者线程)构造任务,放入任务队列
// 队列的操作,使用自旋锁
task_queue->queue[task_queue->tail].func = func;
task_queue->queue[task_queue->tail].arg = arg;
task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
task_queue->count++;
// 2、唤醒线程池中的线程
if (pthread_cond_signal(&(pool->condition)) != 0) {
pthread_mutex_unlock(&(pool->mutex));
return -5;
}
pthread_mutex_unlock(&(pool->mutex));
return 0;
}
3、线程池网络编程
3.1、reactor 的问题
在一个事件循环中,可以处理多个就绪事件。这些就绪事件在 reactor 模型中是串行执行的,一个事件处理若耗时较长,会延迟其他同时触发的事件的处理。
void eventloop_once(reactor_t * r, int timeout) {
int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
for (int i = 0; i < n; ++i) {
struct epoll_event *e = &r->fire[i];
int mask = e->events;
// 用 io 函数捕获具体的错误信息
if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
// 用 io 函数捕获断开的具体信息
if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
event_t *et = (event_t*) e->data.ptr;
// 处理读事件
if (mask & EPOLLIN) {
if (et->read_fn) {
et->read_fn(et->fd, EPOLLIN, et);
}
}
// 处理写事件
if (mask & EPOLLOUT) {
if (et->write_fn) {
et->write_fn(et->fd, EPOLLOUT, et);
}
else {
uint8_t * buf = buffer_write_atmost(&et->out);
event_buffer_write(et, buf, buffer_len(&et->out));
}
}
}
}
接下来,介绍 nginx, redis, skynet 线程池实现形式,都是对 reactor 模型的优化。
3.2、nginx 线程池
为什么使用线程池?
nginx 是磁盘 io 密集型。业务逻辑是处理文件缓冲,需要磁盘 io 操作,耗费大量的时间。
什么情况下使用线程池?
nginx 线程池作用 compute 阶段,将业务逻辑交给线程池处理。
怎样使用线程池?
nginx 的线程池有两个消息队列,当线程池处理完任务后,将处理的结果放入完成消息队列,以事件的方式通知主线程,主线程从完成消息队列中取出结果发送给客户端。
值得注意的是,nginx 不推荐线程池的方式,可以采用替代的操作 sendfile
, directio
解决磁盘 io 的问题。
3.3、redis 线程池
为什么使用线程池?
redis 是网络 io 密集型,需要同时处理多条并发请求,存在读写 io 的问题(请求大量数据,写日志业务等)。
什么情况下使用线程池?
redis 线程池作用读写 io 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程处理业务,是因为 redis 采用高效的数据结构,其业务逻辑处理较快。
怎样使用线程池?
主线程拥有两个全局队列clients_pending_read
和clients_pending_write
,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]
。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。
首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read
中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。
接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。
源码实现细节,见我的另一篇文章,redis 源码解析:IO 线程池。
3.4、skynet 线程池
为什么使用线程池?
skynet 是 cpu 密集型。处理读写 io 、数据包解压缩、业务逻辑处理等。特别地,当同一个 io 在多个线程处理时,将写 io 转由网络线程处理。
什么情况下使用线程池?
nginx 线程池作用于除 send 外的所有阶段。
怎样使用线程池?
skynet 的网络线程收集事件,将事件插入到对应的 actor 的专属消息队列中。全局消息队列存储有消息的 actor,即描述活跃的 actor,线程池负责检查该全局队列。