Linux线程池实现

发布于:2025-03-31 ⋅ 阅读:(17) ⋅ 点赞:(0)

1.线程池实现

全部代码:whb-helloworld/113

1.唤醒线程

一个是唤醒全部线程,一个是唤醒一个线程。

void WakeUpAllThread()
        {
            LockGuard lockguard(_mutex);
            if (_sleepernum)
                _cond.Broadcast();
            LOG(LogLevel::INFO) << "唤醒所有的休眠线程";
        }

        void WakeUpOne()
        {
            _cond.Signal();
            LOG(LogLevel::INFO) << "唤醒一个休眠线程";
        }

2.创建线程

全局变量gnum的值就是要创建的线程的个数,然后把lambda表达式插入进去,Thread里面有function可以接收这个表达式,然后执行routine函数时就会执行这个表达式。emplace_back一个表达式,本质就是把这个表达式复制给这个vector<Thread>的Thread类的元素,就是传参过去。

 ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0)
        {
            for (int i = 0; i < num; i++)
            {
                _threads.emplace_back(
                    [this]()
                    {
                        HandlerTask();
                    });
            }
        }
 using func_t = std::function<void()>;
 static void *Routine(void *args) // 属于类内的成员函数,默认包含this指针!
        {
            Thread *self = static_cast<Thread *>(args);
            self->EnableRunning();
            if (self->_isdetach)
                self->Detach();
            pthread_setname_np(self->_tid, self->_name.c_str());
            self->_func(); // 回调处理

            return nullptr;
        }

 

在C++中,捕获 this 是一种常见的做法,尤其是在使用 Lambda 表达式时。捕获 this 的主要目的是为了让 Lambda 表达式能够访问类的成员变量和成员函数。

3.开始函数

判断是否运行,没有就把状态变为true,然后范围for把线程都启动,并打印日志信息

 void Start()
        {
            if (_isrunning)
                return;
            _isrunning = true;
            for (auto &thread : _threads)
            {
                thread.Start();
                LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();
            }
        }

 4.关掉拷贝和赋值

ThreadPool(const ThreadPool<T> &) = delete;
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

5.单例模式

 先判断是否创建了实例,没有就加锁,第二个判断也是判断是否有实例,没有就创建并执行开始函数,最后返回inc这个指向实例的指针。第一个判断是为了下次不用一直加锁,只要保证第一次实例加锁就行,后面就可以不用加锁了,提高效率。

static ThreadPool<T> *GetInstance()
        {
            if (inc == nullptr)
            {
                LockGuard lockguard(_lock);

                LOG(LogLevel::DEBUG) << "获取单例....";
                if (inc == nullptr)
                {
                    LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";
                    inc = new ThreadPool<T>();
                    inc->Start();
                }
            }

            return inc;
        }

 6.停止函数和回收函数

根据_isrunning判断状态,如果是运行状态,就变为false,然后唤醒所有线程开始执行任务,

join就回收这些线程的信息。

 void Stop()
        {
            if (!_isrunning)
                return;
            _isrunning = false;

            // 唤醒所有的线层
            WakeUpAllThread();
        }
        void Join()
        {
            for (auto &thread : _threads)
            {
                thread.Join();
            }
        }

 

7.处理任务函数

开始是把线程名字放到name里面,进到循环里面,要加锁保证sleepernum值不会因为并发而改变,循环里面第一个循环只有任务队列为空且线程是运行才会进到等待队列进行休眠,sleep值是计算休眠的个数,判断是线程不运行且任务队列为空就退出了,没退出就说明有任务,就取出队列的任务给t,最后就可以t()执行任务。

void HandlerTask()
        {
            char name[128];
            pthread_getname_np(pthread_self(), name, sizeof(name));
            while (true)
            {
                T t;
                {
                    LockGuard lockguard(_mutex);
                    // 1. a.队列为空 b. 线程池没有退出
                    while (_taskq.empty() && _isrunning)
                    {
                        _sleepernum++;
                        _cond.Wait(_mutex);
                        _sleepernum--;
                    }
                    // 2. 内部的线程被唤醒
                    if (!_isrunning && _taskq.empty())
                    {
                        LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";
                        break;
                    }

                    // 一定有任务
                    t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了!!!
                    _taskq.pop();
                }
                t(); // 处理任务,需/要在临界区内部处理吗?1 0
            }
        }
  1. pthread_getname_np(pthread_self(), name, sizeof(name));

    • pthread_getname_np 是一个 POSIX 线程库(pthread)提供的函数,用于获取线程的名称。

    • pthread_self() 是一个函数,返回当前线程的线程ID(pthread_t 类型)。

    • name 是目标数组,用于存储线程名称。

    • sizeof(name) 是目标数组的大小,确保不会超出数组的存储范围。

8.入任务函数

判断状态,运行就进去加锁,插入数据加锁避免并发插入数据,在判断线程大小和休眠个数是否相等,一样就说明都休眠了,要唤醒线程进行处理任务。

bool Enqueue(const T &in)
        {
            if (_isrunning)
            {
                LockGuard lockguard(_mutex);
                _taskq.push(in);
                if (_threads.size() == _sleepernum)
                    WakeUpOne();
                return true;
            }
            return false;
        }

9.完整代码

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"

// .hpp header only

namespace ThreadPoolModule
{
    using namespace ThreadModlue;
    using namespace LogModule;
    using namespace CondModule;
    using namespace MutexModule;

    static const int gnum = 5;
    template <typename T>
    class ThreadPool
    {
    private:
        void WakeUpAllThread()
        {
            LockGuard lockguard(_mutex);
            if (_sleepernum)
                _cond.Broadcast();
            LOG(LogLevel::INFO) << "唤醒所有的休眠线程";
        }

        void WakeUpOne()
        {
            _cond.Signal();
            LOG(LogLevel::INFO) << "唤醒一个休眠线程";
        }

        ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0)
        {
            for (int i = 0; i < num; i++)
            {
                _threads.emplace_back(
                    [this]()
                    {
                        HandlerTask();
                    });
            }
        }
        void Start()
        {
            if (_isrunning)
                return;
            _isrunning = true;
            for (auto &thread : _threads)
            {
                thread.Start();
                LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();
            }
        }

        ThreadPool(const ThreadPool<T> &) = delete;
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

    public:
        static ThreadPool<T> *GetInstance()
        {
            if (inc == nullptr)
            {
                LockGuard lockguard(_lock);

                LOG(LogLevel::DEBUG) << "获取单例....";
                if (inc == nullptr)
                {
                    LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";
                    inc = new ThreadPool<T>();
                    inc->Start();
                }
            }

            return inc;
        }
        void Stop()
        {
            if (!_isrunning)
                return;
            _isrunning = false;

            // 唤醒所有的线层
            WakeUpAllThread();
        }
        void Join()
        {
            for (auto &thread : _threads)
            {
                thread.Join();
            }
        }
        void HandlerTask()
        {
            char name[128];
            pthread_getname_np(pthread_self(), name, sizeof(name));
            while (true)
            {
                T t;
                {
                    LockGuard lockguard(_mutex);
                    // 1. a.队列为空 b. 线程池没有退出
                    while (_taskq.empty() && _isrunning)
                    {
                        _sleepernum++;
                        _cond.Wait(_mutex);
                        _sleepernum--;
                    }
                    // 2. 内部的线程被唤醒
                    if (!_isrunning && _taskq.empty())
                    {
                        LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";
                        break;
                    }

                    // 一定有任务
                    t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了!!!
                    _taskq.pop();
                }
                t(); // 处理任务,需/要在临界区内部处理吗?1 0
            }
        }
        bool Enqueue(const T &in)
        {
            if (_isrunning)
            {
                LockGuard lockguard(_mutex);
                _taskq.push(in);
                if (_threads.size() == _sleepernum)
                    WakeUpOne();
                return true;
            }
            return false;
        }
        ~ThreadPool()
        {
        }

    private:
        std::vector<Thread> _threads;
        int _num; // 线程池中,线程的个数
        std::queue<T> _taskq;
        Cond _cond;
        Mutex _mutex;

        bool _isrunning;
        int _sleepernum;

        // bug??
        static ThreadPool<T> *inc; // 单例指针
        static Mutex _lock;
    };

    template <typename T>
    ThreadPool<T> *ThreadPool<T>::inc = nullptr;

    template <typename T>
    Mutex ThreadPool<T>::_lock;

}

2.单例模式与设计模式

类里面static修饰变量,则变量是属于类的,而不是属于某个特定对象,无论创建多少个对象,static变量都只有一份,所有对象共享一份static变量。

static修饰方法,则方法也是属于类的,不能访问非静态成员(变量和方法),因为非静态成员需要对象才能访问,所有对象共享一个static方法

而锁加static,是为了保证唯一锁,不然创建多个锁没有static,则每个锁都是不一样的,就等于一个门上有很多钥匙孔,static就可以保证只有一个钥匙孔和一把钥匙。

单例模式

单例模式是属于一种创建型设计模式,确保一个类在整个应用程序中只有一个实例,提供一个全局访问点来访问这个实例这个实例。

饿汉模式

饿汉模式在类被加载时就立即创建实例,而不是在第一次需要实例时才创建。这种方式特定是实例创建早,通长用于资源不多且创建实例开销较小的情况,因为每次执行都要创建一个实例,如果实例体积大就会占用很多内存,所以适用于资源不多的。

饿汉模式特点

立即实例化:类加载时就创建了单例实例,不管是否有其它地方需要这个实例

线程安全:由于实例在加载时就创建好了,线程安全问题得到天然的解决

浪费资源:单例对象的创建开销大,且开辟的空间不一定用的上,会有低效率

当两个线程同时访问一个类,并且这个类采用立即实例化的方式创建单例时,由于实例化是在类加载时完成的,所以实际上不存在两个线程同时创建实例的情况。下面是具体的原因和过程:

1. **类加载机制**:在Java中,类加载是由类加载器完成的。类加载器会保证一个类只被加载一次。当第一个线程触发类的加载时,类加载器会同步这个加载过程,确保其他线程在当前线程完成加载之前不会进入类的加载过程。

2. **静态初始化**:在类加载的过程中,如果类中包含静态初始化块或者静态变量初始化,这些操作会在类加载时执行。如果单例实例是在静态初始化块中创建的,那么这个实例的创建过程是同步的,即在任何线程看到类的加载完成之前,实例已经被创建好了。

3. **线程安全保证**:由于类加载和静态初始化是同步的,这意味着当第一个线程触发类的加载并创建实例时,其他线程会被阻塞,直到实例创建完成。因此,不会有多个线程同时创建实例的情况发生。

4. **可见性**:在Java中,静态变量的初始化具有可见性保证。一旦静态变量被初始化,它对所有线程都是可见的。这意味着一旦单例实例被创建,所有线程都可以看到它。

总结来说,即使在多线程环境中,由于类加载和静态初始化的同步机制,以及静态变量的可见性保证,立即实例化的单例模式可以确保在任何时候都只有一个实例被创建,从而保证了线程安全。这就是为什么即使有两个线程同时访问一个采用立即实例化的类,也不会导致线程安全问题的原因。

饿汉模式单例实现

template<typename T>
class Singleton
{
	static T data;
public:
	static T* GetInstance()
	{
		return &data;
	}
};

懒汉模式

是一种单例模式的实现方式,就需要在第一次实例时创建对象。特点是推迟实例化,从而避免对不必要的资源消耗。

懒汉单例模式特点

延迟实例化:对象仅在第一次被请求时才进行创建,从而提高程序的效率

资源节省:在不需要实例的情况下,避免了资源的浪费,优化了资源的使用

线程安全问题:在多线程环境下,可能需要额外的同步机制来确保线程安全问题,确保实例的唯一性。

 懒汉单例模式的实现

template<typename T>
class Singleton
{
	static T* inst;
public:
	static T* GetInstance()
	{
		if (inst == nullptr)
		{
			inst = new T();
		}
	return inst;
	}
};

上面线程池以懒汉模式实现

3.线程安全和重入问题

概念
线程安全:就是多个线程在访问共享资源时,能够正确地执⾏,不会相互⼲扰或破坏彼此的执⾏结
果。⼀般⽽⾔,多个线程并发同⼀段只有局部变量的代码时,不会出现不同的结果。但是对全局变量 或者静态变量进⾏操作,并且没有锁保护的情况下,容易出现该问题。
重⼊:同⼀个函数被不同的执⾏流调⽤,当前⼀个流程还没有执⾏完,就有其他的执⾏流再次进⼊, 我们称之为重⼊。⼀个函数在重⼊的情况下,运⾏结果不会出现任何不同或者任何问题,则该函数被 称为可重⼊函数,否则,是不可重⼊函数。
常⻅的线程不安全的情况
不保护共享变量的函数
函数状态随着被调⽤,状态发⽣变化的函数
返回指向静态变量指针的函数
调⽤线程不安全函数的函数
常⻅不可重⼊的情况
调⽤了malloc/free函数,因为malloc函数
是⽤全局链表来管理堆的
调⽤了标准I/O库函数,标准I/O库的很多实
现都以不可重⼊的⽅式使⽤全局数据结构
可重⼊函数体内使⽤了静态的数据结构
常⻅的线程安全的情况
每个线程对全局变量或者静态变量只有读取
的权限,⽽没有写⼊的权限,⼀般来说这些
线程是安全的
类或者接⼝对于线程来说都是原⼦操作
多个线程之间的切换不会导致该接⼝的执⾏
结果存在⼆义性
常⻅可重⼊的情况
不使⽤全局变量或静态变量
不使⽤ malloc或者new开辟出的空间
不调⽤不可重⼊函数
不返回静态或全局数据,所有数据都有函数
的调⽤者提供
使⽤本地数据,或者通过制作全局数据的本
地拷⻉来保护全局数据
可重⼊与线程安全联系
函数是可重⼊的,那就是线程安全的(其实知道这⼀句话就够了)
函数是不可重⼊的,那就不能由多个线程使⽤,有可能引发线程安全问题
如果⼀个函数中有全局变量,那么这个函数既不是线程安全也不是可重⼊的。
可重⼊与线程安全区别
可重⼊函数是线程安全函数的⼀种
线程安全不⼀定是可重⼊的,⽽可重⼊函数则⼀定是线程安全的。
如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重⼊函数若锁还
未释放则会产⽣死锁,因此是不可重⼊的。
注意:
如果不考虑信号导致一个执行流重复进入函数这种情况,线程安全和重入在安全角度不做区分,但是线程安全侧重说明线程访问公共资源的安全问题情况,表项的是并发线程的特点,可重入描述的是一个函数是否能被重复进入,表示的是函数的特点。

4.常见锁的概念

死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站⽤不会
释放的资源⽽处于的⼀种永久等待状态。
为了⽅便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进⾏后续资源的访问

 

 死锁的四个必要条件

互斥条件:一个资源每次只能被一个执行流使用

请求与保持条件:一个执行流因请求资源而阻塞,对已获得的资源保持不放

 

5.STL,智能指针和线程安全

 STL的容器不是线程安全的,STL的设计初衷是将性能挖掘到极致,而一旦设计到加锁保证线程安全,会对性能造成巨大影响,而且对于不同的容器,加锁的方式不同。这样STL默认是线程不安全的,需要自己保证线程安全。

智能指针unique_ptr,由于只是在当前代码块内生效,不涉及线程安全问题。

对于shared_ptr,多个对象需要共用一个引用计数变量,就会存在线程安全问题。标准库解决了这个问题,基于原子操作的(CAS)的方式保证了shared_ptr能够高效,原子的操作引用计数。


网站公告

今日签到

点亮在社区的每一天
去签到