【Linux】线程池

发布于:2024-06-27 ⋅ 阅读:(184) ⋅ 点赞:(0)

一、线程池

1.1 线程池的介绍

       线程池是一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

1.2 线程池的应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短,Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的,因为单个任务小,而任务数量巨大,你可以想想一个热门网站的点击次数。但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间长很多。
  2. 对性能要求苛刻的应用,比如要求服务器迅速相应客户请求
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

1.3 线程池的示例

  1. 创建固定数量线程池,循环从任务队列中获取任务对象
  2. 获取到任务对象后,执行任务对象中的任务接口

1.4 线程池的代码编写

1.4.1 线程池类的属性

       首先,一个线程池要有对线程数量的计数,对线程还需要一个数据用于对线程进行管理,还需要一个存储任务的队列,线程可以从队列中拿任务进行处理,因为线程中会有同步和互斥,为了保护临界资源,我们需要锁和条件变量,之后线程肯定会有等待,因此用一个数据来记录线程等待的数量。

int _threadnum; // 记录线程的数量
std::vector<Thread> _threads; // 进行管理线程的数组
std::queue<T> _task_queue; // 进行管理任务的队列
pthread_mutex_t _mutex; // 锁,保护临界资源
pthread_cond_t _cond; // 条件变量,保护临界资源

int _waitnum; // 记录线程等待的数量

1.4.2 线程池类的构造函数和析构函数

       线程池的构造函数:我们需要对锁、条件变量和一些变量进行赋初值;线程池的析构函数:将锁和条件变量进行销毁。

// 线程池的构造函数
ThreadPool(int threadnum)
    : _threadnum(threadnum) // 要创建的线程的个数
    , _waitnum(0) 
{
    pthread_mutex_init(&_mutex, nullptr);
    pthread_cond_init(&_cond, nullptr);
}

// 线程池的析构函数
~ThreadPool()
{
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
}

1.4.3 线程池中初始化线程的函数以及启动线程的函数

       因为在基于循环队列的生产者消费者模型中,我们需要会发现将线程初始化和线程的启动写在一起会出现一些问题(重入)。因此,我们需要将线程的初始化和线程的启动分开写,先全部进行初始化,然后再全部进行启动。

1.4.3.1 先来巩固一下上述BUG

       这个问题可能会导致我们线程刚创建好就被释放,所以我们需要先将所需要的线程全部创建出来,然后统一进行启动。

1.4.3.2 线程的创建函数
void InitThreadPool()
{
    for (int i = 0; i < _threadnum; i++)
    {
        std::string name = std::string("thread-") + std::to_string(i + 1);
        _threads.emplace_back(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1), name); 
    }
}
1.4.3.2.1 传递给线程的函数是普通函数 

       在这个函数中,我们需要将线程所要执行的函数也一起传递过去,如果我们传递一个普通函数过去是没有什么问题的,因为在thread类中定义的函数模板如下:

using func_t = std::function<void(std::string)>;

我们只需根据模板来创建出对应的函数,我们就可以传递给线程,让线程进行执行。

1.4.3.2.2 传递给线程的函数是类中的函数 

       但是如果说,我们想要传递的是类中的函数,我们需要格外地小心。因为会出现下面的报错:

为什么呢?在类中的函数定义的也是对函数模板相对应的参数类型,为什么函数不匹配??

       因为在这个类中的函数会有一个隐藏的参数就是this指针,我们需要将this指针也考虑进去,但是我们肯定是不能将thread类中的函数模板进行更改,为了使函数模板具有兼容性,我们需要对在传递函数的参数中进行修改:我们利用bind函数将this参数单独提出。

_threads.emplace_back(std::bind(&ThreadPool::HanderTask, 
                            this, std::placeholders::_1), name;

std::bind函数

       bind函数是C++11的一个函数模板,用于将函数及其参数绑定到一个可调用对象上,它可以用于创建一个函数对象,这个函数对象可以调用原来的函数并传递预先绑定的参数。而std::placeholders::_1则表示一个占位符,用于在调用时传递第二个参数。

1.4.3.3 线程的启动函数

利用thread类中的start函数将线程进行启动:

void Start()
{
    for (auto &k : _threads)
    {
        k.Start();
    }
}

1.4.4 分配线程任务的函数

       在给线程初始化时,需要给线程传递分配任务的函数,该函数的作用是将任务分配给空闲的线程,让其完成任务。由于该函数是一个临界资源,所以我们需要对其加锁,并在执行完后进行解锁操作,以防形成死锁的操作。由于光有加锁操作和解锁操作是不行的,因为可能会导致一系列问题:一个线程反复横跳。此时,我们需要调用一个条件变量,我们需要利用条件变量将线程进行操作,以达到线程同步的现象。

       然后在对函数主体进行分析,因为有一个条件变量,所以我们需要将分条件进行操作。当存放任务的队列为空时,我们需要将线程进行休i眠操作。当存放任务的队列不为空时,我们需要将任务分配给其他不休眠的线程去执行。

void HanderTask(std::string name)
{
    while (true)
    {
        Lock(); // 保护措施,进行加锁
      
        if(_task_queue.empty())  // 如果队列中的任务为空时
        {
            _waitnum++;
            pthread_cond_wait(&_cond);
            _waitnum--;
        }

        T t = _task_queue.front();
        _task_queue.pop();
        Unlock(); // 保护措施,进行解锁

        // 进行执行任务t
    }
}

1.4.5 在外部将任务分配给线程的函数

       我们需要给线程池分配任务利用该函数,该函数进行将分配给的任务存储在任务队列中,等待给线程分配任务。在分配任务的同时,我们需要检查该线程池中是否有等待的线程,我们需要将等待的线程进行唤醒,但这次只是唤醒一个。由于该操作是临界资源,所以我们需要进行加锁和解锁操作。

bool Enqueue(T &t)
{
    bool ret = false;
    Lock(); // 保护措施,进行加锁
    _task_queue.push(t);
    if(_waitnum > 0)
    {
        pthread_cond_signal(&_cond);
    }
    ret = true;
    Unlock(); // 保护措施,进行解锁

    return ret;
}

1.4.6 添加一个变量:判断线程是否在启动状态

       首先,现在属性中添加这个变量:bool  isrunning。然后我们需要将该线程的一些函数进行修改,再来重新整理一下思路:

       先对线程进行初始化,在初始化时,我们需要将isrunning赋值为true,然后启动线程,对代码进行修改:

void InitThreadPool()  // 进行初始化线程函数
{
    for(int i = 0; i < threadnum; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        _threads.emplace_back(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1), name);
    }
    _isrunning = true;  // 不在start中写的原因是防止有的线程创建的过快,导致有的线程中的isrunning = false
}

       此时,我们可以分配任务给线程,将任务压入队列中,只有在线程池正在运行的时候,我们才能将任务压入队列中:

bool Enqueue(T &t)
{
    bool ret = false;
    Lock(); // 加锁
    if(_isrunning)  // 只有在线程运行时,我们才能压入数据
    {    
        _task_queue.push(t);
        if(_waitnum > 0)
        {
            pthread_cond_signal(&_cond);
        }
        ret = true;
    }
    Unlock(); // 解锁

    return ret;
}

       之后会对分配给线程的任务进行处理,会根据线程池的isrunning属性和任务队列是否为空进行判断,分为四个判断条件:

  1. 任务队列为空且线程池正在运行
  2. 任务队列为空且线程池不运行
  3. 任务队列不为空且线程池正在运行
  4. 任务队列不为空且线程池不运行

下面分别给这四个条件分配任务:

       如果任务队列为空且线程池正在运行,我们需要将该线程进行休眠;如果任务队列为空且线程池不运行,我们需要将该线程退出;其余两种情况,让线程继续运行任务。

void HanderTask(std::string name)
{
    while (true)
    {
        // 进行加锁
        Lock();
        if(_task_queue.empty() && isruning)
        {
            // 说明线程可以进行休眠
            _waitnum++;
            pthread_cond_wait(&_cond); // 将线程进行休眠
            _waitnum--;
        }
        if(_task_queue.empty() && !sirunning)
        {
            Unlock(); // 直接进行解锁
            break;
        }
        
        // 选代码阶段
        T t = _task_queue.front();
        _task_queue.pop();
        
        // 进行解锁
        Unlock();
        
        // 进行执行任务
        t();
    }
}

       之后,我们可以使用isrunning对线程池进行停止操作,在停止操作中,我们需要将isrunning置为false,然后将所有线程进行唤醒,然后将让线程执行完手头上的任务就可以进行退出了。

void Stop()
{
    Lock(); // 加锁
    _isrunnig = false;
    pthread_cond_broadcast(&_cond);
    Unlock(); // 解锁
} 

       最后将所有线程进行关闭,利用线程中的Join函数就可以将线程进行回收。

void Wait()
{
    for(auto &k : _threads)
    {
        k.Join();
    }
}

1.5 线程的完整代码:

#include <iostream>
#include <string>
#include <vector>
#include "Thread.hpp"
#include <pthread.h>
#include <queue>

using namespace ThreadModule;

const int morenthreadnum = 3;

template <class T>
class ThreadPool
{

private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Threadsleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    void Threadwakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadwakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

public:
    ThreadPool(int threadnum = morenthreadnum)
        : _threadnum(threadnum), _waitnum(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    void HanderTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用
    {

        while (true)
        {
            // 1. 保护措施
            Lock();
            // 2. 线程池为空时
            while (_task_queue.empty() && _isrunning)
            {
                _waitnum++;
                Threadsleep();
                _waitnum--;
            }

            // //
            if (_threads.empty() && !_isrunning)
            {
                Unlock();
                break;
            }

            T t = _task_queue.front();
            _task_queue.pop();

            Unlock();

            // t();
        }
    }

    void InitThreadPool()
    {
        for (int i = 0; i < _threadnum; i++)
        {
            std::string name = std::string("thread-") + std::to_string(i + 1);
            //_threads.emplace_back(&ThreadPool::HanderTask, name);
            _threads.emplace_back(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1), name); // ???
        }
        _isrunning = true;
    }

    void Start()
    {
        for (auto &k : _threads)
        {
            k.Start();
        }
    }

    void stop()
    {
        Lock();
        _isrunning = false;
        ThreadwakeupAll();
        Unlock();
    }

    void Wait()
    {
        for (auto &k : _threads)
        {
            k.Join();
        }
    }

    bool Enqueue(T &t)
    {

        bool ret = false;

        Lock();
        if (_isrunning)
        {
            _task_queue.push(t);

            if (_waitnum > 0)
            {
                Threadwakeup();
            }
            ret = true;
        }

        Unlock();

        return ret;
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _threadnum;               // 记录创建线程的总数
    std::vector<Thread> _threads; // 管理线程
    std::queue<T> _task_queue;
    pthread_mutex_t _mutex; // 保护机制
    pthread_cond_t _cond;   // 条件变量

    int _waitnum;
    bool _isrunning; // 判断线程池是否正在运行
};

二、日志的编写

2.1 日志的概念及其作用

       日志是记录系统运行过程中各种重要信息的文件,在系统运行过程中由各进程创建并记录。

       日志的作用是记录系统的运行过程及异常信息,为快速定位系统运行中出现的问题及开发过程中的程序调试问题提供详细信息。        

2.2 编写一个日志

2.2.1 日志是有等级的以及运用函数将其打印出来

       日志中的语句是具有等级的,比如:进行排除BUG,一些信息,警告信息,错误信息,是程序崩溃的信息。因此,我们可以使用enum来将其存储起来,按照等级的高低,对其进行整数的赋值。之后,我们使用一些算法将整数转换为字符串。

enum level{
    DEBUG = 0,
    INFO,
    WARNING,
    ERROR,
    FATAL
};

std::string Leveltostring(int level)
{
    switch(level)
    {
    case DEBUG: return "Debug";
    case INFO: return "Info";
    case WARNING: return "Warning";
    case ERROR: return "Error";
    case FATAL: return "Fatal";
    dafault: return "None";
}

2.2.2 日志是由时间的,运用函数将其打印出来

       日志是有时间的,我们需要借助一些函数来获取当前时间,下面我们来具体介绍一下函数的原型及功能:

2.2.2.1 time函数(毫秒级)

函数的原型:

函数的功能:

       获取当前时间到1970年1月1日0时0分0秒的秒数,单位是毫秒级。

函数的参数:

  • 一般使用nullptr

函数的返回值:

  • 返回从1970年1月1日0时0分0秒到现在的秒数
2.2.2.2 gettimeofday函数(微妙级)

函数的原型:

函数的功能:

       查看当前时间,这个函数会计算从1970年1月1号00:00(UTC)到当前的时间跨度,单位是微妙级。

函数的参数:

  • tv:输出型参数,我们需要一个特定的结构体传进去,然后利用结构体将所获取的时间打印出来
  • struct timeval {
        time_t      tv_sec;     /* seconds */
        suseconds_t tv_usec;    /* microseconds */
    };
  • tz:传NULL(因为linux内核不会使用这个参数)。

函数的返回值:

  • 调用成功返回0,失败返回-1
2.2.2.3 localtime函数

函数的原型:

函数的功能:

       获取当前时间和日期并转换为本地时间

函数的参数:

  • time:将获取到的时间传进去

函数的返回值:

        成功: struct tm *结构体, 原型如下:
            struct tm {
                   int tm_sec;       /* 秒 – 取值区间为[0,59] */ 
                   int tm_min;       /* 分 - 取值区间为[0,59] */ 
                   int tm_hour;      /* 时 - 取值区间为[0,23] */ 
                   int tm_mday;     /* 一个月中的日期 - 取值区间为[1,31] */ 
                   int tm_mon;     /* 月份(从一月开始,0代表一月) - 取值区间为[0,11] */ 
                   int tm_year;     /* 年份,其值等于实际年份减去1900 */ 
                   int tm_wday;    /* 星期 – 取值区间为[0,6],其中0代表星期天,1代表星期一 */ 
                   int tm_yday;    /* 从每年1月1日开始的天数– 取值区间[0,365],其中0代表1月1日 */ 
                   int tm_isdst;    /* 夏令时标识符,夏令时tm_isdst为正;不实行夏令时tm_isdst为0 */    
            };
                此结构体空间由内核自动分配, 而且不要去释放它.
        失败: NULL

       介绍完函数后,我们来编写返回时间字符串的函数:

std::string timedtring()
{
    // 先获取当前时间
    time_t src = time(nullptr);
    // 将当前时间传入localtime函数中
    struct tm* format_time = localtime(&src);
    // 分情况进行讨论
    if(format_time == nullptr)
    {
        return "None";
    }
    char buffer[1024];
    snprintf(buffer, sizeof buffer, "%d-%d-%d %d:%d:%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);
    return buffer;
}

2.2.3 代码所在的文件和行数

       我们没有方法区获取当前代码所在的文件和行号,但是在之前学习C语言的时候,我们学习过两个宏:__FILE__, __LINE__。这两个宏可以将调用宏的代码所在的文件名和行数显示出来,然后将其打印出来。

  • __LINE__  :当前程序的行号,表示为十进制整型常量
  • __FILE__  :当前源文件名,表示字符串型常量

2.2.4 如何处理可变参数部分

2.2.4.1 利用一个例子来认识一下可变参数的原理

我们先来对这些函数进行了解:

       在函数体中声明一个va_list,然后用va_start函数来获取参数列表中的参数,使用完毕后调用va_end()结束。va_list是void* 指针,指向我们想要传递的参数的首地址,va_start给其进行赋值,将参数中最靠近可变参数的参数的位置赋值给va_list,然后根据循环将可变参数中的参数后va_list对象依次指出,最后使用va_end将指针置为空。

       之后,我们来了解可变参数是如何进行传入参数的,在函数进行入参的时候,我们需要将参数从右到左依次压入函数栈帧,以test函数为例,我们来模拟一下函数参数是如何进行传参的?

void test(int num, ...)
{
    va_list arg;
    va_start(arg, num);

    while (num)
    {
        // 进行强转
        int date = va_arg(arg, int);
        std::cout << "date:" << date << std::endl;
        num--;
    }

    va_end(arg); // arg == nullptr
}
2.2.4.2 vsnprintf函数

函数的原型:

函数的功能:

       使用vsnprintf()用于向一个字符串缓冲区打印格式化字符串,且可以限定打印的格式化字符串的最大长度。
函数的参数:

  • str:用于缓存格式化字符串结果的字符数组
  • size:限定最多打印到缓冲区sbuf的字符的个数为n-1个,因为vsnprintf还要在结果的末尾追加\0。如果格式化字符串长度大于n-1,则多出的部分被丢弃。如果格式化字符串长度小于等于n-1,则可以格式化的字符串完整打印到缓冲区sbuf。一般这里传递的值就是sbuf缓冲区的长度。
  • format:格式化限定字符串
  • ap:可变长度参数列表

函数的返回值:

  • 成功打印到sbuf中的字符的个数,不包括末尾追加的\0
  • 如果格式化解析失败,则返回负数
2.2.4.3 可变参数进行宏替换应该怎么处理?

       使用__VA_ARGS__来进行处理,我们使用define定义一个宏来调用这个函数,这样每次调用就不用写那么多字母。

#define LOG(level, format, ...)
                 LogMessage(__FILE__, __LINE__, level, format, ##__VA_ARGS__)

2.2.5 日志即可以打印在屏幕上,也可以打印在文件中

       我们使用一个参数来确保日志打印在哪里,如果gIsSave为真,说明我们要打印到文件中;如果gIsSave不为真,说明我们要打印到屏幕中。

我们可以通过定义两个宏来改变这个属性变量:

// 定义一个宏,将这个宏决定是否打印到文件中
#define Enfile()        \
    do                  \
    {                   \
        gIsSave = true; \
    } while (0)

#define Unfile()         \
    do                   \
    {                    \
        gIsSave = false; \
    } while (0)

2.2.6 将日志和任务全部加入线程池中

2.2.6.2 将任务分配给线程池中

       由于我们可以给线程分配一些任务,这些任务既可以是函数,也可以是类。任务在执行的时候,如果是类的话,我们需要进行调用类中的方法;但是如果是函数的话,我们只需将函数进行()调用即可,因此为了兼容性和一致性,我们可以在类中重载一下(),将类中的方法利用重载符号进行调用。

void operator()()
{
    Excute();
}

网站公告

今日签到

点亮在社区的每一天
去签到