线程池的实现和网络应用

发布于:2022-12-16 ⋅ 阅读:(720) ⋅ 点赞:(0)

线程池的实现和网络应用

在阅读本文的同时,思考以下问题

  • 为什么使用线程池
  • 线程池的构成
  • 消息队列如何设计(结合线程池网络编程的实例)。

1、线程池的概念

1.1、为什么要使用线程池

对于某些特别耗时的任务,严重影响了该线程处理其他任务,需要将这些任务交给其他线程异步处理,但是频繁创建和销毁线程也会给系统带来大量开销。线程池的出现可以很好解决上述问题,线程池中的线程数量需要我们在线程资源的开销和 cpu 核心间进行平衡选择,要做到既能减少线程创建和销毁的开销,又能最大限度的提高 cpu 利用率。

1.2、线程池的作用

  • 复用线程资源,减少线程创建和销毁的开销
  • 可异步处理生产者线程的任务。

1.3、线程池的构成

线程池其实是一个生产者和消费者模型,生产者线程发布任务,将任务放入队列中。线程池中的空闲消费者线程从队列中取出任务,并执行任务。

在这里插入图片描述

如图,线程池有三个核心组件

  • 生产者线程:发布任务
  • 队列:存储任务结点,其中包括任务的上下文、任务的执行函数等
  • 线程池(消费者):取出任务,执行任务。此过程涉及到线程调度的问题,即从无到有唤醒,从有到无休眠。

1.4、线程池线程数量的平衡选择

线程池线程数量的经验公式:(io等待时间 + cpu运算时间)* 核心数 / cpu运算时间

  • io 密集型:2 * cpu 核心数 + 有效磁盘数(2)
  • cpu 密集型:与 cpu 核心数一致

2、线程池的实现

2.1、线程池的组件

  • 生产者线程:主线程

  • 任务队列

    // 任务结点
    typedef struct task_s {
        handler_pt func; // 任务的执行函数
        void * arg;      // 任务的上下文
    } task_t;
    
    // 任务队列
    typedef struct task_queue_s {
        uint32_t head;  // 队列的头指针
        uint32_t tail;  // 队列的尾指针
        uint32_t count; // 队列中的任务结点数量
        task_t *queue;  // 队列数组
    } task_queue_t;
    
  • 线程池

    struct thread_pool_t {
        pthread_mutex_t mutex;
        pthread_cond_t condition;
        pthread_t *threads;
        task_queue_t task_queue;
    
        int closed;     // 退出标记
        int started;    // 当前运行的线程数
    
        int thrd_count; // 线程的数量
        int queue_size; // 队列的长度,设置数组,一次性分配内存,不使用内存池
    };
    

2.2、线程池的接口

2.2.1、创建线程池

  • 线程池的线程数量:根据经验公式计算得出
  • 队列的长度:线程的栈空间固定大小(默认8M),由线程数量和线程栈空间大小共同决定
thread_pool_t* thread_pool_create(int thrd_count, int queue_size) {
    thread_pool_t *pool;

    if (thrd_count <= 0 || queue_size <= 0) {
        return NULL;
    }

    // 线程池分配内存
    pool = (thread_pool_t*) malloc(sizeof(*pool));
    if (pool == NULL) {
        return NULL;
    }

    // 初始化线程池
    // 为什么不直接用thrd_count赋值?而是选择从0开始计数
    // 每当成功创建1个线程,计数+1,避免线程创建失败造成计数混乱
    pool->thrd_count = 0;  
    pool->queue_size = queue_size;
    pool->task_queue.head = 0;
    pool->task_queue.tail = 0;
    pool->task_queue.count = 0;
    pool->started = pool->closed = 0;

    // 创建任务队列
    pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
    if (pool->task_queue.queue == NULL) {
        // TODO: free pool
        return NULL;
    }

    // 创建线程
    pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads == NULL) {
        // TODO: free pool
        return NULL;
    }
	// 依次创建好线程
    for (int i = 0; i < thrd_count; ++i) {
        if (pthread_create(&(pool->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
            // TODO: free pool
            return NULL;
        }
        pool->thrd_count++;
        pool->started++;
    }
    return pool;
}

// 线程池中的线程(消费者)该干的事儿
static void* thread_worker(void *thrd_pool) {
    thread_pool_t *pool = (thread_pool_t*)thrd_pool;
    task_queue_t *que;
    task_t task;

    for (;;) {
        pthread_mutex_lock(&(pool->mutex));
        que = &pool->task_queue;
        
        // 虚假唤醒问题
        // while 判断:没有任务而且线程池没有关闭
        while (que->count == 0 && pool->closed == 0) {
            // pthread_mutex_unlock(&(pool->mutex))
            // 阻塞在 condition
            // ===================================
            // 解除阻塞
            // pthread_mutex_lock(&(pool->mutex));
            pthread_cond_wait(&(pool->condition), &(pool->mutex));
        }
        // 线程池关闭
        if (pool->closed == 1) break;
        
        // 获取任务
        task = que->queue[que->head];
        que->head = (que->head + 1) % pool->queue_size;
        que->count--;
        pthread_mutex_unlock(&(pool->mutex));
        
        // 执行任务
        (*(task.func))(task.arg);
    }

    // 销毁该线程
    pool->started--;
    pthread_mutex_unlock(&(pool->mutex));
    pthread_exit(NULL);
    return NULL;
}
虚假唤醒

上述代码注释中提到了虚假唤醒,那么,什么是虚假唤醒?

虚假唤醒指的是在多线程环境下,多个线程等待在同一个条件上。当条件满足时,可能唤醒多个线程。如果这个资源只能由一个线程获得,剩余线程无法获得该资源。对于无法获得资源的线程来说,这种唤醒是无意义的,这种现象称为虚假唤醒。

虚假唤醒产生的原因

  • 信号中断导致的问题(Linux2.6以后已解决)
  • pthread_cond_signal,至少唤醒1个线程,即可能同时唤醒多个线程。

为避免虚假唤醒的发生,每个被唤醒的线程都需要再检查一次条件是否满足。如果不满足,应该继续睡眠;只有满足了才能往下执行。因此,需要把条件变量的判断从 if 判断换成 while 判断,此时,一次只能唤醒一个线程。

2.2.2、销毁线程池

  • 标记线程池退出
  • 通知所有线程
int thread_pool_destroy(thread_pool_t *pool) {
    if (pool == NULL) {
        return -1;
    }

    // 阻止产生新的任务
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    // 判断是否已经退出,防止重复释放空间
    if (pool->closed) {
        thread_pool_free(pool);
        return -3;
    }

    // 标记线程池退出
    pool->closed = 1;

    // 唤醒所有阻塞在cond上的线程,并释放互斥锁
    if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
            pthread_mutex_unlock(&(pool->mutex)) != 0) {
        thread_pool_free(pool);
        return -4;
    }

    // 等待所有线程退出
    wait_all_done(pool);
	// 释放线程池空间
    thread_pool_free(pool);
    return 0;
}

2.2.3、生产者抛出任务

  • 构造任务
  • 放入队列
  • 唤醒线程
int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
    if (pool == NULL || func == NULL) {
        return -1;
    }

    task_queue_t *task_queue = &(pool->task_queue);

    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }
    
    // 判断线程池是否关闭
    if (pool->closed) {
        pthread_mutex_unlock(&(pool->mutex));
        return -3;
    }

    // 判断任务队列是否已满
    if (task_queue->count == pool->queue_size) {
        pthread_mutex_unlock(&(pool->mutex));
        return -4;
    }

    // 1、主线程(生产者线程)构造任务,放入任务队列
    // 队列的操作,使用自旋锁
    task_queue->queue[task_queue->tail].func = func;
    task_queue->queue[task_queue->tail].arg = arg;
    task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
    task_queue->count++;

    // 2、唤醒线程池中的线程
    if (pthread_cond_signal(&(pool->condition)) != 0) {
        pthread_mutex_unlock(&(pool->mutex));
        return -5;
    }
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

3、线程池网络编程

3.1、reactor 的问题

在这里插入图片描述

在一个事件循环中,可以处理多个就绪事件。这些就绪事件在 reactor 模型中是串行执行的,一个事件处理若耗时较长,会延迟其他同时触发的事件的处理。

void eventloop_once(reactor_t * r, int timeout) {
    int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
    for (int i = 0; i < n; ++i) {
        struct epoll_event *e = &r->fire[i];
        int mask = e->events;
        // 用 io 函数捕获具体的错误信息
        if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
        // 用 io 函数捕获断开的具体信息
        if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
        event_t *et = (event_t*) e->data.ptr;
        // 处理读事件
        if (mask & EPOLLIN) {
            if (et->read_fn) {
                et->read_fn(et->fd, EPOLLIN, et);
            }    
        }
        // 处理写事件
        if (mask & EPOLLOUT) {
            if (et->write_fn) {
                et->write_fn(et->fd, EPOLLOUT, et);
            }       
            else {
                uint8_t * buf = buffer_write_atmost(&et->out);
                event_buffer_write(et, buf, buffer_len(&et->out));
            }
        }
    }
}

接下来,介绍 nginx, redis, skynet 线程池实现形式,都是对 reactor 模型的优化。

3.2、nginx 线程池

为什么使用线程池?

nginx 是磁盘 io 密集型。业务逻辑是处理文件缓冲,需要磁盘 io 操作,耗费大量的时间。

什么情况下使用线程池?

nginx 线程池作用 compute 阶段,将业务逻辑交给线程池处理。

在这里插入图片描述

怎样使用线程池?

nginx 的线程池有两个消息队列,当线程池处理完任务后,将处理的结果放入完成消息队列,以事件的方式通知主线程,主线程从完成消息队列中取出结果发送给客户端。

在这里插入图片描述

值得注意的是,nginx 不推荐线程池的方式,可以采用替代的操作 sendfile, directio解决磁盘 io 的问题。

3.3、redis 线程池

为什么使用线程池?

redis 是网络 io 密集型,需要同时处理多条并发请求,存在读写 io 的问题(请求大量数据,写日志业务等)。

什么情况下使用线程池?

redis 线程池作用读写 io 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程处理业务,是因为 redis 采用高效的数据结构,其业务逻辑处理较快。

在这里插入图片描述

怎样使用线程池?

主线程拥有两个全局队列clients_pending_readclients_pending_write,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。

首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。

接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。

在这里插入图片描述

源码实现细节,见我的另一篇文章,redis 源码解析:IO 线程池。

3.4、skynet 线程池

为什么使用线程池?

skynet 是 cpu 密集型。处理读写 io 、数据包解压缩、业务逻辑处理等。特别地,当同一个 io 在多个线程处理时,将写 io 转由网络线程处理。

什么情况下使用线程池?

nginx 线程池作用于除 send 外的所有阶段。

在这里插入图片描述

怎样使用线程池?

skynet 的网络线程收集事件,将事件插入到对应的 actor 的专属消息队列中。全局消息队列存储有消息的 actor,即描述活跃的 actor,线程池负责检查该全局队列。

在这里插入图片描述

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到