手写线程池|C语言版(二)|定义线程池的结构、创建线程池实例

发布于:2024-05-01 ⋅ 阅读:(87) ⋅ 点赞:(0)

本文中,我们将创建线程池的结构体,该结构体中包含了线程池所需要的全部数据类型,并且我们实现了创建线程池函数

前文回顾:

手写线程池|C语言版(一)|线程池的定义和运行逻辑

定义线程池结构

下列结构体定义在文件threadpool.c中

任务结构体

typedef struct Task
{
    void (*function)(void* arg);//这是一个函数指针和函数指针的参数
    void* arg;//表示可以接受任何类型的参数
}Task;

任务结构体中包含了函数指针和函数指针的参数。后续我们需要定义添加任务的函数,通过该函数实现任务结构体的初始化。

定义线程池结构体

struct ThreadPool
{
    //任务队列
    Task* taskQueue;	//任务队列
    int queueCapacity; //容量
    int queueSize;     //当前任务个数
    int queueFront;    //队头->取数据
    int queueTail;     //队尾->放数据

    // 管理者线程
    pthread_t managerID; //管理者线程ID
    pthread_t *threadIDs; //工作的线程ID
    int minNums;    //最小线程数
    int maxNums;    //最大线程数
    int busyNums;    //忙的线程数
    int liveNums;    //存活的线程
    int exitNums;   // 要杀死的线程个数

    //锁
    pthread_mutex_t mutexPool;  //锁整个的线程池
    pthread_mutex_t mutexBusy;  //锁busyNums,因为其变化比较频繁

    //条件变量
    pthread_cond_t notFull;     //任务队列是不是满了
    pthread_cond_t notEmpty;    //任务队列是不是空了

    int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};

组织头文件

我们需要在头文件中写入线程池各个操作的函数原型

#ifndef _THREADPOOL_H
#define _THREADPOOL_H
//创建线程池并初始化

//销毁线程池

//给线程池添加任务

//获取线程池中工作的线程的个数

//获取线程池中活着的线程的个数

#endif //_THREADPOOL_H

创建线程池实例

在本节中,我们的目标是创建一个线程池实例,首先需要在头文件中定义创建线程池的函数原型

函数原型定义

我们先分析一下,初始化一个线程池需要什么参数呢?

首先我们初始化的线程池中有需要维护的最小进程,然后我们需要让线程池知道它的最大进程数是多少,进程池中有一个任务队列来存储任务,构建任务队列我们必须要知道任务队列的尺寸

综上所述:我们需要参数minNumThread, maxNumberThread, queueSize

既然是为了创建一个线程池实例,我们的返回值当然就是该线程池的地址ThreadPool*

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;
//创建线程池
ThreadPool* threadPoolCreate(int min, int max, int queueSize)

#endif //_THREADPOOL_H

线程池创建函数实现

初始化线程池结构体指针

我们首先需要初始化一个ThreadPool* pool出来,因为pool在后续代码中,不仅要初始化其中的各类数据结构,还需要将pool返回给调用者,完成线程池的创建。

ThreadPool *threadPoolCreate(int min, int max, int queueSize)
{   
    //如果初始化操作没问题,就应该返回该线程池的结构体指针
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    if (pool == NULL){
            printf("malloc threadpool fail...\n");
            return NULL;
    }
	...
	return pool;
}

初始化线程池结构体的各类参数

  • 为工作线程分配空间
    由于我们需要通过管理者线程来对工作线程进行增加或者减少的操作,所以我们为工作线程分配堆空间,方便管理。
		pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)* max);
        if (pool->threadIDs == NULL){
            printf("malloc threadIDs fail...\n");
            return NULL;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t)* max);
  • 初始化线程池的各个重要参数
        pool->minNums = minNumThread;
        pool->maxNums = maxNumThread;
        pool->busyNums = 0;
        pool->liveNums = minNumThread; //刚开始和最小个数相等
        pool->exitNums = 0;
  • 初始化锁和条件变量
        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0||
            pthread_cond_init(&pool->notFull, NULL) != 0){
                printf("mutex or cond failed ...\n");
                return NULL;
        }
  • 初始化任务队列
        //任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task)* queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueTail = 0;

        pool->shutdown = 0;
  • 创建管理者线程和工作线程

在这里管理者线程和工作线程的回调函数,我们会留到后面章节去实现,首先需要明确这两个回调函数的参数。

还记得管理者线程和工作者线程的任务吗?
手写线程池|C语言版(一)|线程池的定义和运行逻辑

很明显,其中manager需要按策略来添加、销毁进程,需要的参数显然包括我们线程池中的大部分参数,所以我们传入pool;

对于worker,我们需要它能够从任务队列中取任务,任务队列及其相关的数据我们都定义在了struct ThreadPool结构体中,所以我们仍然选择传入pool

        //创建线程
        pthread_create(&pool->managerID, NULL, manager, pool);//管理者线程
        for (int i = 0; i < minNumThread; i++){
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);//工作线程
        }

初始化上述结构后,我们应当返回return pool
然而,我们忽视了一个很大的问题,由于我们malloc了好几块内存用于为threadIDspool分配堆内存,但是一旦分配失败,我们应该释放他们占用的空间,不然他们会成为野指针,可能造成内存泄漏。
但是呢,一个一个为他们free比较麻烦,这里推荐一种结构,来增强代码的健壮性。

do{	//把之前分配内存或者其他操作导致的return NULL统一改写成break
	//在函数块的最后进行后续的处理工作(包括内存释放和返回空指针)
	a = malloc();
	if (false){
		break;
	}
	b = malloc();
	if (false){
		break;
	}
	c = malloc();
	if (false){
		break;
	}
}while(0)
free(a);
free(b);
free(c);
return NULL:

定义线程池的结构C代码

typedef struct Task
{
    void (*function)(void* arg);//这是一个函数指针和函数指针的参数
    void* arg;//表示可以接受任何类型的参数
    //这里函数参数是个范型就是借鉴的pthread_creat()函数的第三个参数
}Task;

struct ThreadPool
{
    Task* taskQ;
    int queueCapacity; //容量
    int queueSize;     //当前任务个数
    int queueFront;    //队头->取数据
    int queueTail;     //队尾->放数据

    pthread_t managerID; //管理者线程ID
    pthread_t *threadIDs; //工作的线程ID
    int minNums;    //最小线程数
    int maxNums;    //最大线程数
    int busyNums;    //忙的线程数
    int liveNums;    //存活的线程
    int exitNums;   // 要杀死的线程个数

    pthread_mutex_t mutexPool;  //锁整个的线程池
    pthread_mutex_t mutexBusy;  //锁busyNums,因为其变化比较频繁

    pthread_cond_t notFull;     //任务队列是不是满了
    pthread_cond_t notEmpty;    //任务队列是不是空了

    int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};

创建线程池总体C代码

(共50行不到)

ThreadPool *threadPoolCreate(int min, int max, int queueSize)
{   
    //如果初始化操作没问题,就应该返回该线程池的结构题
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));

    do{
        if (pool == NULL){
            printf("malloc threadpool fail...\n");
            break;
        }

        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)* max);
        if (pool->threadIDs == NULL){
            printf("malloc threadIDs fail...\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t)* max);
        pool->minNums = min;
        pool->maxNums = max;
        pool->busyNums = 0;
        pool->liveNums = min; //刚开始和最小个数相等
        pool->exitNums = 0;

        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0||
            pthread_cond_init(&pool->notFull, NULL) != 0){
                printf("mutex or cond failed ...\n");
                break;
        }

        //任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task)* queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueTail = 0;

        pool->shutdown = 0;

        //创建线程
        pthread_create(&pool->managerID, NULL, manager, pool);//管理者线程
        for (int i = 0; i < min; i++){
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);//工作线程
        }
        return pool;
        
    }while(0);

    // 释放资源
    if (pool && pool->threadIDs) free(pool->threadIDs);
    if (pool && pool->taskQ) free (pool->taskQ);
    if (pool) free(pool);

    return NULL;
    
}