Linux操作系统之线程:线程池

发布于:2025-07-29 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

前言:

一、什么是线程池

二、线程池的实现

2.1、线程池的启动与等待停止

2.2、线程池的新增任务

2.3、线程池的回调函数实现

2.4、优化

三、线程安全的单例模式

3.1、什么是单例模式

3.2、懒汉实现方式与饿汉实现方式

3.3、单例化线程池

总结:


前言:

昨天我们完成了对日志功能的实现,但实际上昨天的代码只是为了给今天我们实现线程池提供助力。

话不多说,让我们开始在系统部分的最后一舞:线程池的实现吧!!

我们要先来了解一下什么是线程池。

一、什么是线程池

线程池是一种多线程处理形式,它预先创建一组线程并管理它们的生命周期,用于执行大量短期异步任务。

他与线程池的设计模式是一样的,二者都是遵循的消费者生产者模型。

大家可以把我们的进程池代码拿过来看,二者其实还是挺相似的。

那么,接下来我们就来实现一下这个线程池,以便帮助大家更好的理解。


二、线程池的实现

我们接下来就来实现一下我们的线程池代码。

首先还是老规矩,我们把我们的命名空间与需要用到的头文件之类的全部加上去。

随后我们先创建一个ThreadPool类

我们的线程池里面首先先把最基础的框架写出来,最后一步一步的来完善具体的接口。

线程池,线程池,那里面一定会有很多线程,所以我们可以先定义一个变量num表示这个线程池里的线程数目,如果可以,我们可以定义一个缺省的值,在线程池进行构造的时候就创建出我们的相应数量的线程。

随后将我们的一系列的接口,比如增加任务,启动线程池,停止线程池的接口名写出来。

这么多线程如何管理呢?

如同我们实现队列一样,我们可以通过容器来进行管理,我们这里就通过vector数组来进行存储,并且,我们创建一个智能指针来对这个线程进行管理(我们这里所使用的线程,条件变量等,都是我们自己封装的),所以我们的代码可以先暂时写成这样:

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once

#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include "Mythread.hpp"
#include <vector>

namespace ThreadPoolModule
{
    using thread_t = std::shared_ptr<ThreadModule::Mythread>;
    using namespace LogModule;
    using namespace SemMudule;
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    static int default_num = 5;
    class ThreadPool
    {
    public:
        ThreadPool(int num = default_num)
            : _num(num)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>());
            }
        }

        void Start()//线程池的启动
        {
        }

        void Equeue()//新增任务
        {
        }
        void Wait()//线程池的停止
        {
        }

    private:
        std::vector<thread_t> _threadpool;
        int _num;
    };
}

#endif

2.1、线程池的启动与等待停止

我们创建线程池对象的时候会自动调用构造函数,并给我们分配五个线程,那我们怎么样启动我们的线程池开始工作呢?

这里我们就要完善我们的Start接口了。

如何启动?思路是什么?很简单,我们调用的是我们封装的线程类,所以我们可以直接for循环启动啊!!

        void Start()
        {
            for(auto &it:_threadpool)
            {
                it->start();
            }
        }

为了测试我们的启动是否成功,我们可以在创建线程的智能指针时传进去一个func_t = std::function<void()>;该类型的参数(这个代码我们是在线程的封装时写进去的),为了简便,我们可以直接写一个lambda表达式:

        ThreadPool(int num = default_num)
            : _num(num)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>([]()
            {
                while(true)
                {
                     ENABLE_CONSOLE_LOG();
                    LOG(LogLevel::INFO)<<"线程池线程开始运行";
                    sleep(1);
                }
            }));
            }
        }

        void Start()
        {
            for(auto &it:_threadpool)
            {
                it->start();
            }
        }


test.cc:

#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
int main()
{

    ThreadPool tp(3);
    tp.Start();
    sleep(10);

    return 0;
}

运行结果为:

这意味着我们的线程其实就跑起来了.

我们还可以继续往我们的构造函数中写日志来打印相关信息。


与线程池的启动类似,我们同样可以通过一个for循环,来调用封装的线程内部的Join函数接口,实现等待线程。


        void Wait()
        {
            for(auto &it:_threadpool)
            {
                it->join();
                LOG(LogLevel::INFO)<<"等待线程"<<it->Name()<<"...成功";
            }
        }

2.2、线程池的新增任务

作为一个线程池,你必须实现从外界获取任务并让内部线程调用的功能。

为了管理我们这新增的任务,我们可以使用队列来收集,所以新增任务就可以变成队列的插入。

但是,为了让我们的线程去自动的获取任务,执行任务,我们需要实现一个回调函数,我们会把这个回调函数在线程创建时传给线程,这样每个线程会去运行回调函数,并在回调函数中通过一系列我们之前所学的知识调用任务,运行任务。

我们这里统一的把任务类型规定为

using task_t=std::function<void()>;

并手动实现一个模范任务为:

namespace TaskModule
{
    using namespace LogModule;
    using task_t=std::function<void()>;

    void task()
    {
        LOG(LogLevel::INFO)<<"任务开始执行...";
    }
}

随后新增我们的任务队列变量:
 

 std::queue<T> _taskqueue;

这里我们选择使用模板,也就是说把线程池变成一个模板,方便我们后面执行不同返回值的任务,而不是一味的只能执行task_t

这里的模板划分是根据我们的任务的类型来划分的。

所以涉及到新增任务的Equeue接口自然要使用T来接收可能的任务类型:

        void Equeue(T && task)//我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        //所以我们这里要使用模板参数T
        //这里的T && task是一个语法:引用折叠
        //如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        //如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        //如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            _taskqueue.push(std::forward<T>(task));//我们这里使用std::forward来进行完美转发
        }

这里需要上锁,是因为可能会有多个线程并发访问我们的共享区资源:_taskqueue(比如取任务的线程与新增任务撞车了)。

另外,我们可以先把回调函数声明一下,等会进行实现,并且在线程创建时把我们之前暂时传递的lambda表达式改成我们的回调函数


这是我们现在已经写出的代码,供大家查漏补缺:

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once

#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>

#include <vector>
#include <functional>

namespace TaskModule
{
    using namespace LogModule;
    using task_t = std::function<void()>;

    void task()
    {
        LOG(LogLevel::INFO) << "任务开始执行...";
    }
}

namespace ThreadPoolModule
{
    using thread_t = std::shared_ptr<ThreadModule::Mythread>;
    using namespace LogModule;
    using namespace TaskModule;
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    static int default_num = 5;
    template <typename T>
    class ThreadPool
    {
    private:


        void HandlerTask() // 回调函数,负责在线程初始化时调用
        {
            while (true)
            {
            }
        }

    public:
        ThreadPool(int num = default_num)
            : _num(num)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this)));
                //我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                //我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                //我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }

        void Start()
        {
            for (auto &it : _threadpool)
            {
                LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
                it->start();
            }
        }

        void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        // 所以我们这里要使用模板参数T
        // 这里的T && task是一个语法:引用折叠
        // 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        // 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        // 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            _taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
            LOG(LogLevel::INFO) << "任务入队列成功";
        }
        void Wait()
        {
            for (auto &it : _threadpool)
            {
                it->join();
                LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
            }
        }

    private:
        std::vector<thread_t> _threadpool; // 线程管理数组
        std::queue<T> _taskqueue;          // 任务队列
        int _num;                          // 线程数量
        Mutex _mutex;                      // 锁
    };
}

#endif

目前为止需要注意的点是Equeue时新增的任务需要使用&&,包含了一个小语法引用折叠 。

以及我们想要让线程调用类成员方法,需要进行绑定,提供this指针


2.3、线程池的回调函数实现

我们接下来实现一下线程池的回调函数接口,这个函数就是我们之前写的逻辑,在while循环中不断判断是否有任务,有的话就执行,没的话就进入条件变量中等待。

所以在这里我们需要用到条件变量cond,锁,进行等待的线程数量,以及一个判断任务队列是否为空的小接口:

Cond _cond;
int wait_num;                      // 进行等待的线程数量
         bool IsEmpty()//判断任务队列是否为空
        {
            return _taskqueue.empty();
        }
         void HandlerTask() // 回调函数,负责在线程初始化时调用
        {
            while (true)
            {
                T t;
                {
                    LockGuard lock(_mutex);
                    while (IsEmpty())
                    {
                        wait_num++;
                        _cond.wait(_mutex);
                        wait_num--;
                    }

                    t=_taskqueue.front();//拿出任务
                    _taskqueue.pop();//删除任务
                }

                //执行任务不用上锁
                t();//执行任务,我们这里规定传进来的任务必须重载()运算符
            }
        }

值得注意是是我们加入了一个新参数wait_num来记录此时等待的线程数量。

根据我们所学的消费者生产者模型,可以知道,这些等待的线程如何被唤醒?
当然是生产者生产之后,此时一定有数据,所以我们需要对Equeue进行补充,在新增任务后,判断waitnum的数量,进行唤醒工作。

        void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        // 所以我们这里要使用模板参数T
        // 这里的T && task是一个语法:引用折叠
        // 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        // 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        // 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            _taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
            LOG(LogLevel::INFO) << "任务入队列成功";

            if(wait_num>0)
            {
                _cond.SignalAll();//唤醒所有线程
            }
        }

此时我们写一个主函数代码来测试:
 

#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using namespace TaskModule;
int main()
{

    ENABLE_CONSOLE_LOG();
    // ENABLE_FILE_LOG();

    std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();
    tp->Start();

    int cnt = 10;
    char c;
    while (true)
    {
        tp->Equeue(TaskModule::task);
        cnt--;
        sleep(1);
    }

    return 0;
}

运行结果为:

2.4、线程池的停止运行

任务新增完毕了,或者我们中途想体质线程池的运行,此时我们应该怎么做呢?

我们要完成我们线程池的Stop()接口,通过调用它来实现线程池的退出程序,最后通过Wait来回收线程。

如何做到?我们应该如何思考。

我们可以通过一个标记变量,来进行判断此时的线程池的运行状态。

        bool is_running;                   // 线程池是否在运行

在初始化时对这个标记变量初始化为false表示运行状态,此时还未运行,真正的运行是在start时,如果我们调用stop,可以先判断是否是出于运行状态,涉及到的函数改变:

       ThreadPool(int num = default_num)
            : _num(num), is_running(false),wait_num(0)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
                // 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                // 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                // 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }
        void Start()
        {
            is_running=true;
            for (auto &it : _threadpool)
            {
                LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
                it->start();
            }
        }

当我们第一次进入stop,进行判断,结果为true表示还在运行,我们就执行停止工作,将状态置为false,此时我们就不能再新入任务了,所以在Equeue中应该在最前面先判断状态,此外,在stop中,由于我们执行stop时还有历史任务未完成,所以此时我们需要先唤醒所有的线程(如果有),让它们快速把任务执行完,于是全部陷入阻塞状态。

之后我们就可以调用wait进行线程回收了。

涉及到的代码改变为:
 

         void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        // 所以我们这里要使用模板参数T
        // 这里的T && task是一个语法:引用折叠
        // 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        // 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        // 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            if(!is_running)
            {
                return;//如果不为运行状态,就不能新入任务
            }
            _taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
            LOG(LogLevel::INFO) << "任务入队列成功";

            if(wait_num>0)
            {
                _cond.SignalAll();//唤醒所有线程
            }
        }
         void Stop()
        {
            if(is_running)
            {
                is_running = false;
                //此时不能再入任务队列了
                if(wait_num>0)
                {
                    _cond.SignalAll();//唤醒所有线程
                }
            }
        }

值得注意的是,我们的回调函数也会受到运行状态的影响。

我们进行等待判断的逻辑就应该变为:如果任务队列为空,并且我们是在运行状态,我们才能进行等待,如果不是运行状态,但此时没有任务了,我们也要继续执行,因为我们需要在下面新增一个判断。

在我们从任务队列拿任务之前,我们需要判断:此时任务队列是否为空&&是否不再运行。

如果此时该条件中的两个都满足,表示任务执行完毕,应该结束回调函数,所以我们进行break。

此时在本条判断中,不可能出现任务为空,还在运行的情况。因为这样就在上条while循环中出不来了。

所以只会另外出现二种情况:任务不为空,且此时还在运行;任务不为空,且不在运行。

这两种情况我们都需要把任务做完,并且后续的拿任务不会出错。

涉及到的代码:
 

        void HandlerTask() // 回调函数,负责在线程初始化时调用
        {
            LOG(LogLevel::INFO) << "线程开始执行回调函数逻辑";
            while (true)
            {
                T t;
                {
                    LockGuard lock(_mutex);
                    while (IsEmpty() && is_running)
                    {
                        wait_num++;
                        _cond.Wait(_mutex);
                        wait_num--;
                    }

                    if (IsEmpty() && !is_running)
                    {
                        break; // 退出回调函数
                    }

                    // 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
                    t = _taskqueue.front(); // 拿出任务
                    _taskqueue.pop();       // 删除任务
                }

                // 执行任务不用上锁
                t(); // 执行任务,我们这里规定传进来的任务必须重载()运算符
            }

            LOG(LogLevel::INFO) << "线程执行回调函数逻辑结束";
        }

至此,我们的线程池就已经写的差不多了,

完整代码如下:
 

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once

#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>

#include <vector>
#include <functional>

namespace TaskModule
{
    using namespace LogModule;
    using task_t = std::function<void()>;

    void task()
    {
        LOG(LogLevel::INFO) << "任务开始执行...";
    }
}

namespace ThreadPoolModule
{
    using thread_t = std::shared_ptr<ThreadModule::Mythread>;
    using namespace LogModule;
    using namespace TaskModule;
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    static int default_num = 5;
    template <typename T>
    class ThreadPool
    {
    private:
        bool IsEmpty() // 判断任务队列是否为空
        {
            return _taskqueue.empty();
        }

        void HandlerTask() // 回调函数,负责在线程初始化时调用
        {
            LOG(LogLevel::INFO) << "线程开始执行回调函数逻辑";
            while (true)
            {
                T t;
                {
                    LockGuard lock(_mutex);
                    while (IsEmpty() && is_running)
                    {
                        wait_num++;
                        _cond.Wait(_mutex);
                        wait_num--;
                    }

                    if (IsEmpty() && !is_running)
                    {
                        break; // 退出回调函数
                    }

                    // 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
                    t = _taskqueue.front(); // 拿出任务
                    _taskqueue.pop();       // 删除任务
                }

                // 执行任务不用上锁
                t(); // 执行任务,我们这里规定传进来的任务必须重载()运算符
            }

            LOG(LogLevel::INFO) << "线程执行回调函数逻辑结束";
        }

    public:
         ThreadPool(int num = default_num)
            : _num(num), is_running(false),wait_num(0)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
                // 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                // 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                // 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }

        void Stop()
        {
            if (is_running)
            {
                is_running = false;
                // 此时不能再入任务队列了
                if (wait_num > 0)
                {
                    _cond.SignalAll(); // 唤醒所有线程
                }
            }
        }

        void Start()
        {
            is_running = true;
            for (auto &it : _threadpool)
            {
                LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
                it->start();
            }
        }

        void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        // 所以我们这里要使用模板参数T
        // 这里的T && task是一个语法:引用折叠
        // 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        // 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        // 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            if (!is_running)
            {
                return; // 如果不为运行状态,就不能新入任务
            }
            _taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
            LOG(LogLevel::INFO) << "任务入队列成功";

            if (wait_num > 0)
            {
                _cond.SignalAll(); // 唤醒所有线程
            }
        }
        void Wait()
        {
            for (auto &it : _threadpool)
            {
                it->join();
                LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
            }
        }

    private:
        std::vector<thread_t> _threadpool; // 线程管理数组
        std::queue<T> _taskqueue;          // 任务队列
        int _num;                          // 线程数量
        Mutex _mutex;                      // 锁
        Cond _cond;                        // 条件变量
        int wait_num;                      // 进行等待的线程数量
        bool is_running;                   // 线程池是否在运行
    };
}

#endif

我们可以进行测试:

#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using namespace TaskModule;
int main()
{

    ENABLE_CONSOLE_LOG();
    // ENABLE_FILE_LOG();

    std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();
    tp->Start();

    int cnt = 10;
    char c;
    while (cnt)
    {
        tp->Equeue(TaskModule::task);
        cnt--;
        sleep(1);
    }
     tp->Stop();

    sleep(3);

    tp->Wait();

    return 0;
}

在这代码中我们默认给线程池创建了五个线程,并每隔一秒分配一个任务。

之后进行停止,三秒后回收,我们看一下日志打印信息:

2.4、优化

我们可以对打印进行优化一下,分辨一下具体是哪个线程执行的任务,所以我们需要改变一下回调函数的参数,传递线程的名字进去。

        void HandlerTask(std::string name) // 回调函数,负责在线程初始化时调用
        {
            LOG(LogLevel::INFO) << "线程开始执行回调函数逻辑";
            while (true)
            {
                T t;
                {
                    LockGuard lock(_mutex);
                    while (IsEmpty() && is_running)
                    {
                        wait_num++;
                        _cond.Wait(_mutex);
                        wait_num--;
                    }

                    if (IsEmpty() && !is_running)
                    {
                        break; // 退出回调函数
                    }

                    // 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
                    t = _taskqueue.front(); // 拿出任务
                    _taskqueue.pop();       // 删除任务
                }

                // 执行任务不用上锁
                t(name); // 执行任务,我们这里规定传进来的任务必须重载()运算符
            }

            LOG(LogLevel::INFO) << "线程执行回调函数逻辑结束";
        }

    public:
        ThreadPool(int num = default_num)
            : _num(num), is_running(false),wait_num(0)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
                // 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                // 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                // 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }

所以此时我们传递的任务类型也需要增加一个对应的参数,否则会报错,

namespace TaskModule
{
    using namespace LogModule;
    using task_t = std::function<void(std::string)>;

    void task(std::string name)
    {
        LOG(LogLevel::INFO) << "任务开始执行...";
    }
}

不止如此,我们之前写的线程封装代码是这样的:
 

     class Mythread
    {
        private:
         static void *Routine(void*args)
          {
            Mythread *t = static_cast<Mythread *>(args);
            t->_status = TSTATUS::RUNNING;
            t->_func();
            return nullptr;
        }
    public:
        Mythread(func_t func):_func(func), _status(TSTATUS::NEW), _joinable(true)
        {
            _name = "Thread-" + std::to_string(number++);
            _pid = getpid();
        }
        bool start()//负责线程的创建
        {
             if (_status != TSTATUS::RUNNING)
            {
                int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
                if (n != 0)
                    return false;
                return true;
            }
            return false;
        }

注意看这一段代码,我们在线程池中的start中轮番调用了我们线程的start,此时会创建线程并让其执行Routine回调函数,这里面的_func就是我们实际上执行的函数,但我们已经把任务函数的参数变成了void (std::string)了,而不再是void (),所以我们此时应该把名字传递进去。

这里的func_t我们当时命名的是

这里其实是不符合现在类型function<void(std::string)>的,所以我们可以把这里用到的func_t换成task_t。

为了两个头文件不重复包含,我们这里新开一个任务头文件Task.hpp:
 

#include"log.hpp"
#include<string>
#include<functional>

namespace TaskModule
{
    using namespace LogModule;
    using task_t = std::function<void(std::string)>;

    void task(std::string name)
    {
        LOG(LogLevel::INFO) << "任务开始执行...";
    }
}

随后让我们的线程头文件使用task_t而不是func_t.

改动的地方主要有两处:

运行代码:
 


整体的threadpool代码如下: 

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once

#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>
#include "Task.hpp"
#include <vector>
#include <functional>


namespace ThreadPoolModule
{
    using thread_t = std::shared_ptr<ThreadModule::Mythread>;
    using namespace LogModule;
    using namespace TaskModule;
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    static int default_num = 5;
    template <typename T>
    class ThreadPool
    {
    private:
        bool IsEmpty() // 判断任务队列是否为空
        {
            return _taskqueue.empty();
        }

        void HandlerTask(std::string name) // 回调函数,负责在线程初始化时调用
        {
            LOG(LogLevel::INFO) << "线程"<<name<<"开始执行回调函数逻辑";
            while (true)
            {
                T t;
                {
                    LockGuard lock(_mutex);
                    while (IsEmpty() && is_running)
                    {
                        wait_num++;
                        _cond.Wait(_mutex);
                        wait_num--;
                    }

                    if (IsEmpty() && !is_running)
                    {
                        break; // 退出回调函数
                    }

                    // 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
                    t = _taskqueue.front(); // 拿出任务
                    _taskqueue.pop();       // 删除任务
                }

                // 执行任务不用上锁
                t(name); // 执行任务,我们这里规定传进来的任务必须重载()运算符
            }

            LOG(LogLevel::INFO) << "线程"<<name<<"执行回调函数逻辑结束";
        }

    public:
        ThreadPool(int num = default_num)
            : _num(num), is_running(false),wait_num(0)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
                // 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                // 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                // 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }

        void Stop()
        {
            if (is_running)
            {
                is_running = false;
                // 此时不能再入任务队列了
                if (wait_num > 0)
                {
                    _cond.SignalAll(); // 唤醒所有线程
                }
            }
        }

        void Start()
        {
            is_running = true;
            for (auto &it : _threadpool)
            {
                LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
                it->start();
            }
        }

        void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        // 所以我们这里要使用模板参数T
        // 这里的T && task是一个语法:引用折叠
        // 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        // 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        // 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            if (!is_running)
            {
                return; // 如果不为运行状态,就不能新入任务
            }
            _taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
            LOG(LogLevel::INFO) << "任务入队列成功";

            if (wait_num > 0)
            {
                _cond.SignalAll(); // 唤醒所有线程
            }
        }
        void Wait()
        {
            for (auto &it : _threadpool)
            {
                it->join();
                LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
            }
        }

    private:
        std::vector<thread_t> _threadpool; // 线程管理数组
        std::queue<T> _taskqueue;          // 任务队列
        int _num;                          // 线程数量
        Mutex _mutex;                      // 锁
        Cond _cond;                        // 条件变量
        int wait_num;                      // 进行等待的线程数量
        bool is_running;                   // 线程池是否在运行
    };
}

#endif

task.hpp不再赘述。


三、线程安全的单例模式

3.1、什么是单例模式

某些类, 只应该具有⼀个对象(实例), 就称之为单例。

例如⼀个男⼈只能有⼀个媳妇。

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据。
单例模式是一种创建型设计模式,它确保一个类只有一个实例,并提供一个全局访问点来访问该实例。
它常用于管理全局资源,如日志系统、数据库连接、线程池等。

3.2、懒汉实现方式与饿汉实现方式

类比于吃饭。
吃完饭, ⽴刻洗碗 , 这种就是饿汉方式 . 因为下⼀顿吃的时候可以⽴刻拿着碗就能吃饭 .
class Singleton 
{
public:
    static Singleton* getInstance() 
    {
        return instance;  // 直接返回预先创建好的实例
    }

private:
    Singleton() {}  // 私有构造
    static Singleton* instance;  // 静态实例
};

// 程序一启动就初始化(类似“吃完饭立刻洗碗”)
Singleton* Singleton::instance = new Singleton();
吃完饭 , 先把碗放下 , 然后下⼀顿饭用到这个碗了再洗碗 , 就是懒汉方式 .
template <typename T>
class Singleton 
{
    static T* inst;
public:
    static T* GetInstance() 
    {
        if (inst == NULL)
        {
             inst = new T();
        }
        return inst;
    }
 };
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.(类比于写实拷贝)。

 但这个代码存在一个严重问题,在多线程下会不安全。

第一次调用GetInstance时,如果多个线程同时调用,可能会创建出多份T对象事例。

但是后续调用就没问题了

所以我们可以改一下:
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
    volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
    static std::mutex lock;

public:
    static T *GetInstance()
    {
        if (inst == NULL)
        {                // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.
            lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.
            if (inst == NULL)
            {
                inst = new T();
            }
            lock.unlock();
        }
        return inst;
    }
};

3.3、单例化线程池

如果我们想在原来代码基础上实现一个单例化的线程池,该如何实现?

首先由于是只有一个对象,为了安全我们必须禁掉他的复制拷贝与拷贝构造函数。

并且,我们还需要把构造函数放在private中去,防止外部直接new创建对象。这样一来我们只能通过内部的函数调用来创建。

所以我们需要在public中写一个接口实现创建且只创建一个线程池对象,并且新增一把专门保护视获取实例的锁和一个实例指针,指向该实例,存储并维护这个唯一实例的地址。

并且这个接口必须被static修饰,否则我们就必须要先有一个对象才能调用该成员函数。

另外,我们保护线程安全用到的锁,以及将会被我们初始化的唯一一个实例,也需要被static修饰,这是为了让所有的 ThreadPool<T> 类共享同一个 instance 指针。确保无论调用多少次 getInstance(),返回的都是同一个实例。

这里需要注意上面的线程安全问题,所以代码如下:

  class ThreadPool
    {
    private:
       ......

        ThreadPool(const ThreadPool &) = delete;               // 禁止拷贝函数
        ThreadPool<T> &operator=(const ThreadPool &) = delete; // 禁止赋值函数
        ThreadPool(int num = default_num)                      // 将构造函数放在私人权限中防止外部随意创建对象
            : _num(num), is_running(false), wait_num(0)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
                // 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                // 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                // 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }

    public:
        static ThreadPool<T> *getInstance()
        {
            if (instance == NULL)
            {
                LockGuard lockguard(mutex);
                if (instance == NULL)
                {
                    LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
                    instance = new ThreadPool<T>();
                }
            }

            return instance;
        }
       
        .....

    private:
        .....

        static ThreadPool<T> *instance; //实例指针
        static Mutex mutex; // 只用来保护单例
};


注意,static修饰的成员变量的声明在类内,定义在类外:

    template <typename T> // static的类成员变量的初始化需要放在类外
    ThreadPool<T> *ThreadPool<T>::instance = NULL;
    template <typename T>
    Mutex ThreadPool<T>::mutex; // 只用来保护单例

测试一下:
 


#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using namespace TaskModule;
int main()
{
    ENABLE_CONSOLE_LOG();
    ThreadPool<task_t>::getInstance()->Start();
    char c;
    int cnt = 5;
    while (cnt)
    {
        // std::cin >> c;
        ThreadPool<task_t>::getInstance()->Equeue(TaskModule::task);
        cnt--;
        sleep(1);
    }

    ThreadPool<task_t>::getInstance()->Stop();
    ThreadPool<task_t>::getInstance()->Wait();
    return 0;
}

单例化线程池的代码如下:
 

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once

#include "log.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>
#include "Task.hpp"
#include <vector>
#include <functional>

namespace ThreadPoolModule
{
    using thread_t = std::shared_ptr<ThreadModule::Mythread>;
    using namespace LogModule;
    using namespace TaskModule;
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    static int default_num = 5;
    template <typename T>
    class ThreadPool
    {
    private:
        bool IsEmpty() // 判断任务队列是否为空
        {
            return _taskqueue.empty();
        }

        void HandlerTask(std::string name) // 回调函数,负责在线程初始化时调用
        {
            LOG(LogLevel::INFO) << "线程" << name << "开始执行回调函数逻辑";
            while (true)
            {
                T t;
                {
                    LockGuard lock(_mutex);
                    while (IsEmpty() && is_running)
                    {
                        wait_num++;
                        _cond.Wait(_mutex);
                        wait_num--;
                    }

                    if (IsEmpty() && !is_running)
                    {
                        break; // 退出回调函数
                    }

                    // 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
                    t = _taskqueue.front(); // 拿出任务
                    _taskqueue.pop();       // 删除任务
                }

                // 执行任务不用上锁
                t(name); // 执行任务,我们这里规定传进来的任务必须重载()运算符
            }

            LOG(LogLevel::INFO) << "线程" << name << "执行回调函数逻辑结束";
        }

        ThreadPool(const ThreadPool &) = delete;               // 禁止拷贝函数
        ThreadPool<T> &operator=(const ThreadPool &) = delete; // 禁止赋值函数
        ThreadPool(int num = default_num)                      // 将构造函数放在私人权限中防止外部随意创建对象
            : _num(num), is_running(false), wait_num(0)
        {
            for (int i = 0; i < _num; ++i)
            {
                _threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
                // 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
                // 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
                // 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
            }
        }

    public:
        static ThreadPool<T> *getInstance()
        {
            if (instance == NULL)
            {
                LockGuard lockguard(mutex);
                if (instance == NULL)
                {
                    LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
                    instance = new ThreadPool<T>();
                }
            }

            return instance;
        }
        void Stop()
        {
            if (is_running)
            {
                is_running = false;
                // 此时不能再入任务队列了
                if (wait_num > 0)
                {
                    _cond.SignalAll(); // 唤醒所有线程
                }
            }
        }

        void Start()
        {
            is_running = true;
            for (auto &it : _threadpool)
            {
                LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
                it->start();
            }
        }

        void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
        // 所以我们这里要使用模板参数T
        // 这里的T && task是一个语法:引用折叠
        // 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
        // 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
        // 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
        {
            LockGuard lock(_mutex);
            if (!is_running)
            {
                return; // 如果不为运行状态,就不能新入任务
            }
            _taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
            LOG(LogLevel::INFO) << "任务入队列成功";

            if (wait_num > 0)
            {
                _cond.SignalAll(); // 唤醒所有线程
            }
        }
        void Wait()
        {
            for (auto &it : _threadpool)
            {
                it->join();
                LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
            }
        }

    private:
        std::vector<thread_t> _threadpool; // 线程管理数组
        std::queue<T> _taskqueue;          // 任务队列
        int _num;                          // 线程数量
        Mutex _mutex;                      // 锁
        Cond _cond;                        // 条件变量
        int wait_num;                      // 进行等待的线程数量
        bool is_running;                   // 线程池是否在运行

        static ThreadPool<T> *instance;
        static Mutex mutex; // 只用来保护单例
    };
    template <typename T> // static的类成员变量的初始化需要放在类外
    ThreadPool<T> *ThreadPool<T>::instance = NULL;
    template <typename T>
    Mutex ThreadPool<T>::mutex; // 只用来保护单例
}

#endif

总结:

很高兴告诉大家,我们目前为止,关于linux系统部分的内容,就已经学完了。

剩下一些知识点相比之下并不是很重要,我们也许会放到加餐课中进行学习研究。

从明天开始,我们将会进入Linux的另外一种大山:网络编程。

今天的内容希望对大家有所帮助!!


网站公告

今日签到

点亮在社区的每一天
去签到