C / C++ 线程同步和线程池

发布于:2025-05-21 ⋅ 阅读:(22) ⋅ 点赞:(0)

一、线程同步

假设有4个线程A、B、C、D,当前一个线程A对内存中的共享资源进行访问的时候,其他线程B, C, D都不可以对这块内存进行操作,直到线程A对这块内存访问完毕为止,B,C,D中的一个才能访问这块内存,剩余的两个需要继续阻塞等待,以此类推,直至所有的线程都对这块内存操作完毕。 线程对内存的这种访问方式就称之为线程同步,我们可以了解到所谓的同步并不是多个线程同时对内存进行访问,而是按照先后顺序依次串行处理的。

而并行是指系统同时执行多个任务,每个任务在不同的处理器核心上执行,因此它们真正同时进行。(前提是有多核)

1. 不同步示例

示例代码:实现两个线程交替数数(每个线程数50个数,交替数到100)

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>


#define MAX 50

int number = 0;

void *funcA_num(void *arg)
{
	for (int i = 0; i < MAX; i++)
	{
		int cur = number;
		cur++;
		usleep(10);
		number = cur;
		printf("Thread A, id = %lu, number = %d\n", pthread_self(), number);
	}

	return NULL;
}

void *funcB_num(void *arg)
{
	for (int i = 0; i < MAX; i++)
	{
		int cur = number;
		cur++;
		usleep(5);
		number = cur;
		printf("Thread B, id = %lu, number = %d\n", pthread_self(), number);
	}
	
	return NULL;
}


int main()
{
	pthread_t p1, p2;

	pthread_create(&p1, NULL, funcA_num, NULL);
	pthread_create(&p2, NULL, funcB_num, NULL);

	pthread_join(p1, NULL);
	pthread_join(p2, NULL);

	return 0;
}

编译运行结果:
在这里插入图片描述
通过对上面例子的测试,可以看出虽然每个线程内部循环了50次每次数一个数,但是最终没有数到100,通过输出的结果可以看到,有些数字被重复数了多次,其原因就是没有对线程进行同步处理,造成了数据的混乱。

两个线程在数数的时候需要分时复用CPU时间片,并且测试程序中调用了 usleep() 导致线程的CPU时间片没用完就被迫挂起了,这样就能让CPU的上下文切换(保存当前状态, 下一次继续运行的时候需要加载保存的状态)更加频繁,更容易再现数据混乱的这个现象。

2. 同步方式

对于多个线程访问共享资源出现数据混乱的问题,需要进行线程同步。常用的线程同步方式有四种:互斥锁、读写锁、条件变量、信号量。所谓的共享资源就是多个线程共同访问的变量,这些变量通常为全局数据区变量或者堆区变量,这些变量对应的共享资源也被称之为临界资源。

二、互斥锁

互斥锁是线程同步最常用的一种方式,通过互斥锁可以锁定一个代码块, 被锁定的这个代码块, 所有的线程只能顺序执行(不能并行处理),这样多线程访问共享资源数据混乱的问题就可以被解决了,需要付出的代价就是执行效率的降低,因为默认临界区多个线程是可以并行处理的,现在只能串行处理。

互斥锁创建流程:
在这里插入图片描述

示例:用互斥锁修改上面代码

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>


#define MAX 50

int number = 0;

// 创建一把互斥锁
// 全局变量, 多个线程共享
pthread_mutex_t mutex;

void *funcA_num(void *arg)
{
	for (int i = 0; i < MAX; i++)
	{
		pthread_mutex_lock(&mutex);
		int cur = number;
		cur++;
		usleep(10);
		number = cur;
		pthread_mutex_unlock(&mutex);
		printf("Thread A, id = %lu, number = %d\n", pthread_self(), number);
	}

	return NULL;
}

void *funcB_num(void *arg)
{
	for (int i = 0; i < MAX; i++)
	{
		pthread_mutex_lock(&mutex);
		int cur = number;
		cur++;
		usleep(5);
		number = cur;
		pthread_mutex_unlock(&mutex);
		printf("Thread B, id = %lu, number = %d\n", pthread_self(), number);
	}
	
	return NULL;
}

int main()
{
	pthread_t p1, p2;

	// 初始化互斥锁
	pthread_mutex_init(&mutex, NULL);
	
	pthread_create(&p1, NULL, funcA_num, NULL);
	pthread_create(&p2, NULL, funcB_num, NULL);

	pthread_join(p1, NULL);
	pthread_join(p2, NULL);

	// 线程销毁之后, 再去释放互斥锁
	pthread_mutex_destroy(&mutex);

	return 0;
}

三、死锁

当多个线程访问共享资源, 需要加锁, 如果锁使用不当, 就会造成死锁这种现象。

如果线程死锁造成的后果是:所有的线程都被阻塞,并且线程的阻塞是无法解开的(因为可以解锁的线程也被阻塞了)。

造成死锁的场景有如下几种:

  • 加锁之后忘记解锁
  • 重复加锁, 造成死锁(递归锁才可以多次加锁,但解锁次数也要一致
  • 在程序中有多个共享资源, 因此有很多把锁,随意加锁,导致相互被阻塞

例如有两个共享资源:X, Y,X对应锁A, Y对应锁B

  • 线程A访问资源X, 加锁A
  • 线程B访问资源Y, 加锁B
  • 线程A要访问资源Y, 线程B要访问资源X,因为资源X和Y已经被对应的锁锁住了,因此这个两个线程被阻塞

避免死锁:

  • 避免多次锁定, 多检查
  • 对共享资源访问完毕之后, 一定要解锁,或者在加锁的使用 trylock
  • 如果程序中有多把锁, 可以控制对锁的访问顺序(顺序访问共享资源,但在有些情况下是做不到的),另外也可以在对其他互斥锁做加锁操作之前,先释放当前线程拥有的互斥锁
  • 项目程序中可以引入一些专门用于死锁检测的模块

四、读写锁

读写锁是一种允许多个线程同时读取共享资源,但写操作需要互斥的锁机制。它分为读锁和写锁。读锁允许多个线程同时持有读锁,但不允许写锁持有。写锁是互斥的,同一时间只能有一个线程持有写锁,且写锁持有期间不允许其他线程持有读锁。

读写锁可以看作是互斥锁的升级版, 在做读操作的时候可以提高程序的执行效率,如果所有的线程都是做读操作, 那么读是并行的,但是使用互斥锁,读操作也是串行的。

定义:

#include <pthread.h>
pthread_rwlock_t rwlock;

// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

// 释放读写锁占用的系统资源
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

特点:

  • 使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,读锁是共享的。
  • 使用读写锁的写锁锁定了临界区,线程对临界区的访问是串行的,写锁是独占的。
  • 使用读写锁分别对两个临界区加了读锁和写锁,两个线程要同时访问者两个临界区,访问写锁临界区的线程继续运行,访问读锁临界区的线程阻塞,因为写锁比读锁的优先级高

示例:8个线程操作同一个全局变量,3个线程不定时写同一全局资源,5个线程不定时读同一全局资源。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

// 全局变量
int number = 0;

// 定义读写锁
pthread_rwlock_t rwlock;

// 写的线程的处理函数
void* writeNum(void* arg)
{
    while(1)
    {
        pthread_rwlock_wrlock(&rwlock);
        int cur = number;
        cur++;
        number = cur;
        printf("++写操作完毕, number : %d, tid = %ld\n", number, pthread_self());
        pthread_rwlock_unlock(&rwlock);
        
        // 添加sleep目的是要看到多个线程交替工作,进行并发控制测试
        // rand() :生成一个伪随机整数 (范围通常是 0 到 RAND_MAX,如 0~32767)。
		// % 100 :对随机数取模 100,结果范围是 0~99。
        usleep(rand() % 100);
    }

    return NULL;
}

// 读线程的处理函数
// 多个线程可以如果处理动作相同, 可以使用相同的处理函数
// 每个线程中的栈资源是独享
void* readNum(void* arg)
{
    while(1)
    {
        pthread_rwlock_rdlock(&rwlock);
        printf("--全局变量number = %d, tid = %ld\n", number, pthread_self());
        pthread_rwlock_unlock(&rwlock);
        usleep(rand() % 100);
    }
    return NULL;
}

int main()
{
    // 初始化读写锁
    pthread_rwlock_init(&rwlock, NULL);

    // 3个写线程, 5个读的线程
    pthread_t wtid[3];
    pthread_t rtid[5];
    for(int i = 0; i < 3; ++i)
    {
        pthread_create(&wtid[i], NULL, writeNum, NULL);
    }

    for(int i = 0; i < 5; ++i)
    {
        pthread_create(&rtid[i], NULL, readNum, NULL);
    }

    // 释放资源
    for(int i = 0; i < 3; ++i)
    {
        pthread_join(wtid[i], NULL);
    }

    for(int i = 0; i < 5; ++i)
    {
        pthread_join(rtid[i], NULL);
    }

    // 销毁读写锁
    pthread_rwlock_destroy(&rwlock);

    return 0;
}

五、条件变量

条件变量的主要作用不是处理线程同步, 而是进行线程的阻塞。如果在多线程程序中只使用条件变量无法实现线程的同步, 必须要配合互斥锁来使用。

一般情况下条件变量用于处理生产者和消费者模型,并且和互斥锁配合使用。

初始化函数原型:

#include <pthread.h>
pthread_cond_t cond;
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
// 销毁释放资源        
int pthread_cond_destroy(pthread_cond_t *cond);

通过函数原型可以看出,该函数在阻塞线程的时候,需要一个互斥锁参数,这个互斥锁主要功能是进行线程同步,让线程顺序进入临界区,避免出现数共享资源的数据混乱。该函数会对这个互斥锁做以下几件事情:

  • 在阻塞线程时候,如果线程已经对互斥锁mutex上锁,那么会将这把锁打开,这样做是为了避免死锁
  • 当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个mutex互斥锁锁上,继续向下访问临界区

阻塞和唤醒函数原型:

// 线程阻塞函数, 哪个线程调用这个函数, 哪个线程就会被阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
// 将线程阻塞一定的时间长度, 时间到达之后, 线程就解除阻塞了
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

// 唤醒阻塞在条件变量上的线程, 至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒阻塞在条件变量上的线程, 被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

生产者和消费者模型的组成:

  • 生产者线程 -> 若干个
    • 生产商品或者任务放入到任务队列中
    • 任务队列满了就阻塞, 不满的时候就工作
    • 通过一个生产者的条件变量控制生产者线程阻塞和非阻塞
  • 消费者线程 -> 若干个
    • 读任务队列, 将任务或者数据取出
    • 任务队列中有数据就消费,没有数据就阻塞
    • 通过一个消费者的条件变量控制消费者线程阻塞和非阻塞
  • 队列 -> 存储任务/数据,对应一块内存,为了读写访问可以通过一个数据结构维护这块内存
    • 可以是数组、链表,也可以使用stl容器:queue / stack / list / vector

示例代码:使用条件变量实现生产者和消费者模型,生产者有5个,往链表头部添加节点,消费者也有5个,删除链表头部的节点。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

// 链表的节点
struct Node
{
    int number;
    struct Node* next;
};

// 定义条件变量, 控制消费者线程
pthread_cond_t cond;
// 互斥锁变量
pthread_mutex_t mutex;
// 指向头结点的指针
struct Node * head = NULL;

// 生产者的回调函数
void* producer(void* arg)
{
    // 一直生产
    while(1)
    {
        pthread_mutex_lock(&mutex);
        // 创建一个链表的新节点
        struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
        // 节点初始化
        pnew->number = rand() % 1000;
        // 节点的连接, 添加到链表的头部, 新节点就新的头结点
        pnew->next = head;
        // head指针前移
        head = pnew;
        printf("+++producer, number = %d, tid = %ld\n", pnew->number, pthread_self());
        pthread_mutex_unlock(&mutex);

        // 生产了任务, 通知消费者消费
        pthread_cond_broadcast(&cond);

        // 生产慢一点
        sleep(rand() % 3);
    }
    return NULL;
}

// 消费者的回调函数
void* consumer(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&mutex);
        // 一直消费, 删除链表中的一个节点
        //  if(head == NULL)  会出现段错误
        while (head == NULL)
        {
            pthread_cond_wait(&cond, &mutex);
        }
        // 取出链表的头结点, 将其删除
        struct Node* pnode = head;
        printf("--consumer: number: %d, tid = %ld\n", pnode->number, pthread_self());
        head  = pnode->next;
        free(pnode);
        pthread_mutex_unlock(&mutex);        

        sleep(rand() % 3);
    }
    return NULL;
}

int main()
{
    // 初始化条件变量
    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    // 创建5个生产者, 5个消费者
    pthread_t ptid[5];
    pthread_t ctid[5];
    for(int i = 0; i < 5; i++)
    {
        pthread_create(&ptid[i], NULL, producer, NULL);
    }

    for(int i = 0; i < 5; i++)
    {
        pthread_create(&ctid[i], NULL, consumer, NULL);
    }

    // 释放资源
    for(int i = 0; i < 5; i++)
    {
        // 阻塞等待子线程退出
        pthread_join(ptid[i], NULL);
    }

    for(int i = 0; i < 5; i++)
    {
        pthread_join(ctid[i], NULL);
    }

    // 销毁条件变量
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    return 0;
}

上面代码中不能使用 if(head == NULL),原因是:

  • if 只判断一次,条件变量会唤醒多个线程,你这个线程不一定能立马消费
  • 当你消费时,生产者线程可能已经被其他消费者线程使用了
  • 所以得反复去判断 head == NULL
  • 也有可能存在虚假唤醒,所以只能用 while,不能用 if

六、信号量

信号量是一个计数器,可以用来控制多个线程对共享资源的访问,当多个线程出现后,会遇到无序执行的问题。有时候需要对线程的执行顺序做出限定,便引入了信号量,通过PV操作来控制线程的执行顺序。
在这里插入图片描述

信号量和条件变量一样用于处理生产者和消费者模型,用于阻塞生产者线程或者消费者线程的运行。

初始化函数:

#include <semaphore.h>
sem_t sem;

// 初始化信号量/信号灯
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 资源释放, 线程销毁之后调用这个函数即可
// 参数 sem 就是 sem_init() 的第一个参数            
int sem_destroy(sem_t *sem);

参数:

  • sem:信号量变量地址
  • pshared:
    • 0:线程同步
    • 非0:进程同步
  • value:初始化当前信号量拥有的资源数(>=0),如果资源数为0,线程就会被阻塞了。

获取释放资源函数:

// 参数 sem 就是 sem_init() 的第一个参数  
// 函数被调用sem中的资源就会被消耗1个, 资源数-1
int sem_wait(sem_t *sem);

// 调用该函数给sem中的资源数+1
int sem_post(sem_t *sem);

不管是消费者线程的处理函数还是生产者线程的处理函数内部有这么两行代码:

// 消费者 P 操作获取资源
sem_wait(&csem);
pthread_mutex_lock(&mutex);

// 生产者 V 操作释放资源
sem_wait(&csem);
pthread_mutex_lock(&mutex);

这两行代码的调用顺序是不能颠倒的,如果颠倒过来就有可能会造成死锁,原因是:

  • 如果顺序颠倒,消费者直接上锁,这时发现没有资源而被阻塞;
  • 由于消费者上锁后被阻塞,导致解锁也被阻塞,因此生产者也上不了锁,造成死锁

注意:

  • 如果生产者和消费者线程使用的信号量对应的总资源数为1,那么不管线程有多少个,可以工作的线程只有一个,其余线程由于拿不到资源,都被迫阻塞了。
  • 所以不会出现生产者线程和消费者线程同时访问共享资源的情况,不管生产者和消费者线程有多少个,它们都是顺序执行的
  • 如果生产者和消费者线程使用的信号量对应的总资源数为大于1,那么有可能:
    • 多个生产者线程同时生产
    • 多个消费者同时消费
    • 生产者线程和消费者线程同时生产和消费

以上不管哪一种情况都可能会出现多个线程访问共享资源的情况,如果想防止共享资源出现数据混乱,那么就需要使用互斥锁进行线程同步。

示例代码:使用条件变量实现生产者和消费者模型,生产者有5个,往链表头部添加节点,消费者也有5个,删除链表头部的节点。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>

// 链表的节点
struct Node
{
    int number;
    struct Node* next;
};

// 生产者线程信号量
sem_t psem;
// 消费者线程信号量
sem_t csem;

// 互斥锁变量
pthread_mutex_t mutex;
// 指向头结点的指针
struct Node * head = NULL;

// 生产者的回调函数
void* producer(void* arg)
{
    // 一直生产
    while(1)
    {
        // 生产者拿一个信号灯
        sem_wait(&psem);
        // 加锁, 这句代码放到 sem_wait()上边, 有可能会造成死锁
        pthread_mutex_lock(&mutex);
        // 创建一个链表的新节点
        struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
        // 节点初始化
        pnew->number = rand() % 1000;
        // 节点的连接, 添加到链表的头部, 新节点就新的头结点
        pnew->next = head;
        // head指针前移
        head = pnew;
        printf("+++producer, number = %d, tid = %ld\n", pnew->number, pthread_self());
        pthread_mutex_unlock(&mutex);

        // 通知消费者消费
        sem_post(&csem);
        
        // 生产慢一点
        sleep(rand() % 3);
    }
    return NULL;
}

// 消费者的回调函数
void* consumer(void* arg)
{
    while(1)
    {
        sem_wait(&csem);
        pthread_mutex_lock(&mutex);
        struct Node* pnode = head;
        printf("--consumer: number: %d, tid = %ld\n", pnode->number, pthread_self());
        head  = pnode->next;
        // 取出链表的头结点, 将其删除
        free(pnode);
        pthread_mutex_unlock(&mutex);
        // 通知生产者生成, 给生产者加信号灯
        sem_post(&psem);

        sleep(rand() % 3);
    }
    return NULL;
}

int main()
{
    // 初始化信号量
    sem_init(&psem, 0, 5);  // 生成者线程一共有5个信号灯
    sem_init(&csem, 0, 0);  // 消费者线程一共有0个信号灯
    // 初始化互斥锁
    pthread_mutex_init(&mutex, NULL);

    // 创建5个生产者, 5个消费者
    pthread_t ptid[5];
    pthread_t ctid[5];
    for(int i = 0; i < 5; ++i)
    {
        pthread_create(&ptid[i], NULL, producer, NULL);
    }

    for(int i = 0; i < 5; ++i)
    {
        pthread_create(&ctid[i], NULL, consumer, NULL);
    }

    // 释放资源
    for(int i = 0; i < 5; ++i)
    {
        pthread_join(ptid[i], NULL);
    }

    for(int i = 0; i < 5; ++i)
    {
        pthread_join(ctid[i], NULL);
    }

    sem_destroy(&psem);
    sem_destroy(&csem);
    pthread_mutex_destroy(&mutex);

    return 0;
}

七、线程池

1. 线程池原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

想象你开了一家快递站,每天有成千上万的包裹要处理:

  • 没有线程池 :每次来一个包裹,就临时雇一个新员工(线程)处理,处理完就解雇。这样频繁招人、解雇,效率低、开销大。
  • 有线程池 :你提前雇好一批固定员工(核心线程),让他们随时待命。包裹多了,就让这些员工加班(复用线程);如果包裹暴增,再临时招一些兼职(非核心线程);包裹少了,就让兼职下班(回收线程)。这样既省成本,又高效。

2. 线程池组成

任务队列,存储需要处理的任务,由工作的线程来处理这些任务:

  • 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
  • 已处理的任务会被从任务队列中删除
  • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

工作的线程(任务队列任务的消费者) ,N个

  • 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
  • 工作的线程相当于是任务队列的消费者角色,
  • 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
  • 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作

管理者线程(不处理任务队列中的任务),1个

  • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
    • 当任务过多的时候, 可以适当的创建一些新的工作线程
    • 当任务过少的时候, 可以适当的销毁一些工作的线程

3. 示例代码

任务队列

// 任务结构体
typedef struct Task
{
    void (*function)(void* arg);
    void* arg;
}Task;

线程池定义

// 线程池结构体
struct ThreadPool
{
    // 任务队列
    Task* taskQ;
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头 -> 取数据
    int queueRear;      // 队尾 -> 放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小线程数量
    int maxNum;             // 最大线程数量
    int busyNum;            // 忙的线程的个数
    int liveNum;            // 存活的线程的个数
    int exitNum;            // 要销毁的线程个数
    pthread_mutex_t mutexPool;  // 锁整个的线程池
    pthread_mutex_t mutexBusy;  // 锁busyNum变量
    pthread_cond_t notFull;     // 任务队列是不是满了
    pthread_cond_t notEmpty;    // 任务队列是不是空了

    int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};

完整代码:
threadpool.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);

#endif  // _THREADPOOL_H

threadpool.c

#include "threadpool.h"

const int NUMBER = 2;

// 任务结构体
typedef struct Task
{
    void (*function)(void* arg);
    void* arg;
}Task;



// 线程池结构体
struct ThreadPool
{
    // 任务队列
    Task* taskQ;
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头 -> 取数据
    int queueRear;      // 队尾 -> 放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小线程数量
    int maxNum;             // 最大线程数量
    int busyNum;            // 忙的线程的个数
    int liveNum;            // 存活的线程的个数
    int exitNum;            // 要销毁的线程个数
    pthread_mutex_t mutexPool;  // 锁整个的线程池
    pthread_mutex_t mutexBusy;  // 锁busyNum变量
    pthread_cond_t notFull;     // 任务队列是不是满了
    pthread_cond_t notEmpty;    // 任务队列是不是空了

    int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};


ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do 
    {
        if (pool == NULL)
        {
            printf("malloc threadpool fail...\n");
            break;
        }

        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if (pool->threadIDs == NULL)
        {
            printf("malloc threadIDs fail...\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min;    // 和最小个数相等
        pool->exitNum = 0;

        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
            pthread_cond_init(&pool->notFull, NULL) != 0)
        {
            printf("mutex or condition init fail...\n");
            break;
        }

        // 任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;

        pool->shutdown = 0;

        // 创建线程
        pthread_create(&pool->managerID, NULL, manager, pool);
        for (int i = 0; i < min; ++i)
        {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);
        }
        return pool;
    } while (0);

    // 释放资源
    if (pool && pool->threadIDs) free(pool->threadIDs);
    if (pool && pool->taskQ) free(pool->taskQ);
    if (pool) free(pool);

    return NULL;
}

int threadPoolDestroy(ThreadPool* pool)
{
    if (pool == NULL)
    {
        return -1;
    }

    // 关闭线程池
    pool->shutdown = 1;
    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);
    // 唤醒阻塞的消费者线程
    for (int i = 0; i < pool->liveNum; ++i)
    {
        pthread_cond_signal(&pool->notEmpty);
    }
    // 释放堆内存
    if (pool->taskQ)
    {
        free(pool->taskQ);
    }
    if (pool->threadIDs)
    {
        free(pool->threadIDs);
    }

    pthread_mutex_destroy(&pool->mutexPool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);

    free(pool);
    pool = NULL;

    return 0;
}


void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
    pthread_mutex_lock(&pool->mutexPool);
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
    {
        // 阻塞生产者线程
        pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    }
    if (pool->shutdown)
    {
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }
    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
}

int threadPoolBusyNum(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutexPool);
    int aliveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexPool);
    return aliveNum;
}

void* worker(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;

    while (1)
    {
        pthread_mutex_lock(&pool->mutexPool);
        // 当前任务队列是否为空
        while (pool->queueSize == 0 && !pool->shutdown)
        {
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if (pool->exitNum > 0)
            {
                pool->exitNum--;
                if (pool->liveNum > pool->minNum)
                {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    threadExit(pool);
                }
            }
        }

        // 判断线程池是否被关闭了
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutexPool);
            threadExit(pool);
        }

        // 从任务队列中取出一个任务
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;
        // 移动头结点
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;
        // 解锁
        pthread_cond_signal(&pool->notFull);
        pthread_mutex_unlock(&pool->mutexPool);

        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;

        printf("thread %ld end working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void* manager(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;
    while (!pool->shutdown)
    {
        // 每隔3s检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->queueSize;
        int liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
        // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            int counter = 0;
            for (int i = 0; i < pool->maxNum && counter < NUMBER
                && pool->liveNum < pool->maxNum; ++i)
            {
                if (pool->threadIDs[i] == 0)
                {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    counter++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }
        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return NULL;
}

void threadExit(ThreadPool* pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->maxNum; ++i)
    {
        if (pool->threadIDs[i] == tid)
        {
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

test.c

#include "threadpool.h"

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for (int i = 0; i < 100; i++)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool, taskFunc, num);
    }

    sleep(30);

    threadPoolDestroy(pool);
    return 0;
}

编译运行结果:
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到