11.多线程-信号量-线程池

发布于:2025-04-04 ⋅ 阅读:(16) ⋅ 点赞:(0)

信号量

image-20230806153117117

  1. 信号量的基本使用接口

sem_init()

sem_init - initialize an unnamed semaphore

// 头文件
#include <semaphore.h>

// 函数    
int sem_init(sem_t *sem, int pshared, unsigned int value);

// 参数
pshared : 0表示线程间共享,非0表示进程间共享
value :信号量初始值

sem_destroy()

sem_destroy - destroy an unnamed semaphore
#include <semaphore.h>
int sem_destroy(sem_t *sem);

sem_wait()

// p(),p操作
sem_wait, sem_timedwait, sem_trywait - lock a semaphore
#include <semaphore.h>
int sem_wait(sem_t *sem);

sem_post()

// V(),v操作
sem_post - unlock a semaphore
#include <semaphore.h>
int sem_post(sem_t *sem);
  1. 引入环形队列

image-20230807135518874

环形队列代码演示_1(生产的数据为一个整型int)

makefile

ringqueue:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ringqueue

RingQueue.hpp

#pragma once

#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>

// 给定的环形队列的空间大小,作为_cap的缺省值
static const int gcap = 5;

template<class T>
class RingQueue
{
private:
    // 针对信号量的PV操作
    // P操作 --> 对指定的信号量减1
    void P(sem_t &sem)
    {
        // sem_wait() 对指定的信号量减1,使对应的线程拥有空间资源
        int n = sem_wait(&sem);
        assert(n == 0); 
        (void)n;
    }

    // V操作
    // 对指定的信号量加1
    void V(sem_t &sem)
    {
        // sem_post() 对指定的信号量加1,
        // 数据被消费,则对应的空间资源可再利用
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;
    }

public:
    // 构造函数
    RingQueue(const int &cap = gcap)
    : _queue(cap)
    , _cap(cap)
    {
        // 对生产对应的信号量,空间资源做初始化
        int n = sem_init(&_spaceSem, 0, _cap);
        assert(n == 0);
        // 对消费者对应的信号量,数据资源做初始化
        n = sem_init(&_dataSem, 0, 0);
        assert(n == 0);

        _productorStep = _consumerStep = 0;

        pthread_mutex_init(&_pmutex, nullptr);
        pthread_mutex_init(&_cmutex, nullptr);
    }

    // 生产者
    void Push(const T &in)
    {
        // 申请到了空间信号量,意味着,我一定能进行正常的生产
        P(_spaceSem); 

        // 因为有多个线程都会申请到信号量,而多个生产线程是互斥的,因此要进行加锁
        // 单个生产线程,则不需要加锁
        // 加锁
        pthread_mutex_lock(&_pmutex);   

        // 将生产的数据放入到队列中,并对生产者的下标进行++     
        _queue[_productorStep++] = in;

        // 因为是环形队列,所以要进行这样的操作才可以让下标在_cap循环
        _productorStep %= _cap;

        // 解锁
        pthread_mutex_unlock(&_pmutex);

        // 数据资源++
        V(_dataSem);
    }

    // 消费者
    void Pop(T *out)
    {
        // 思考:现加锁,后申请信号量,还是现申请信号量,在加锁?
        // 我们应该先申请信号量,再进行加锁
        // 因为申请信号量的操作是原子的,所以不需要对其进行加锁
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);

        // 将数据消费之后,将数据消费之后,将消费者的下标进行++
        *out = _queue[_consumerStep++];
        _consumerStep %= _cap;
        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);
    }

    // 析构函数
    ~RingQueue()
    {
        // 销毁信号量
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);

        // 销毁锁
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    // 环形队列用vector
    std::vector<T> _queue; 
    // 环形队列的容量 
    int _cap;

    // 生产者信号量
    // 生产者 想生产,看中的是什么资源呢? 空间资源
    sem_t _spaceSem; 
    // 消费者信号量
    // 消费者 想消费,看中的是什么资源呢? 数据资源
    sem_t _dataSem;  

    // 生产者在环形队列中的位置,也就是生产者在队列中的下标
    int _productorStep;
    // 消费者在环形队列中的位置,也就是消费者在队列中的下标
    int _consumerStep;

    // 锁,生产者的锁和消费者进行生产或者消费时需要用到的锁
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

main.cc

#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

std::string SelfName()
{
    char name[128];
    snprintf(name, sizeof(name), "thread[0x%x]", pthread_self());
    return name;
}

// 生产
void *ProductorRoutine(void *rq)
{
    RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);
    while(true)
    {
        int data = rand() % 10 + 1;
        ringqueue->Push(data);
        std::cout << "生产完成,生产的数据是:" << data << std::endl;
    }
}

// 消费
void *ConsumerRoutine(void *rq)
{
    RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);

    while(true)
    {
        int data;
        ringqueue->Pop(&data);
        std::cout << "消费完成,消费的数据是:" << data << std::endl;
    }
}

int main()
{
    // 随机数种子
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x71727374);
    RingQueue<int> *rq = new RingQueue<int>();

    pthread_t p, c;
    pthread_create(&p, nullptr, ProductorRoutine, rq);
    pthread_create(&c, nullptr, ConsumerRoutine, rq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    delete rq;
    return 0;
}

环形队列代码演示_2(生产的数据为一个任务,多线程)

makefile

ringqueue:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ringqueue

Task.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstdio>
#include <functional>

class Task
{
    using func_t = std::function<int(int,int,char)>;
    // typedef std::function<int(int,int)> func_t;
public:
    Task()
    {}
    Task(int x, int y, char op, func_t func)
    :_x(x)
    , _y(y)
    , _op(op)
    , _callback(func)
    {}
    
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        // do nothing
        break;
    }

    return result;
}

RingQueue.hpp

#pragma once

#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>

// 给定的环形队列的空间大小,作为_cap的缺省值
static const int gcap = 5;

template<class T>
class RingQueue
{
private:
    // 针对信号量的PV操作
    // P操作 --> 对指定的信号量减1
    void P(sem_t &sem)
    {
        // sem_wait() 对指定的信号量减1,使对应的线程拥有空间资源
        int n = sem_wait(&sem);
        assert(n == 0); 
        (void)n;
    }

    // V操作
    // 对指定的信号量加1
    void V(sem_t &sem)
    {
        // sem_post() 对指定的信号量加1,
        // 数据被消费,则对应的空间资源可再利用
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;
    }

public:
    // 构造函数
    RingQueue(const int &cap = gcap)
    : _queue(cap)
    , _cap(cap)
    {
        // 对生产对应的信号量,空间资源做初始化
        int n = sem_init(&_spaceSem, 0, _cap);
        assert(n == 0);
        // 对消费者对应的信号量,数据资源做初始化
        n = sem_init(&_dataSem, 0, 0);
        assert(n == 0);

        _productorStep = _consumerStep = 0;

        pthread_mutex_init(&_pmutex, nullptr);
        pthread_mutex_init(&_cmutex, nullptr);
    }

    // 生产者
    void Push(const T &in)
    {
        // 申请到了空间信号量,意味着,我一定能进行正常的生产
        P(_spaceSem); 

        // 因为有多个线程都会申请到信号量,而多个生产线程是互斥的,因此要进行加锁
        // 加锁
        pthread_mutex_lock(&_pmutex);   

        // 将生产的数据放入到队列中,并对生产者的下标进行++     
        _queue[_productorStep++] = in;

        // 因为是环形队列,所以要进行这样的操作才可以让下标在_cap循环
        _productorStep %= _cap;

        // 解锁
        pthread_mutex_unlock(&_pmutex);

        // 数据资源++
        V(_dataSem);
    }

    // 消费者
    void Pop(T *out)
    {
        // 思考:现加锁,后申请信号量,还是现申请信号量,在加锁?
        // 我们应该先申请信号量,再进行加锁
        // 因为申请信号量的操作是原子的,所以不需要对其进行加锁
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);

        // 将数据消费之后,将数据消费之后,将消费者的下标进行++
        *out = _queue[_consumerStep++];
        _consumerStep %= _cap;
        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);
    }

    // 析构函数
    ~RingQueue()
    {
        // 销毁信号量
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);

        // 销毁锁
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    // 环形队列用vector
    std::vector<T> _queue; 
    // 环形队列的容量 
    int _cap;

    // 生产者信号量
    // 生产者 想生产,看中的是什么资源呢? 空间资源
    sem_t _spaceSem; 
    // 消费者信号量
    // 消费者 想消费,看中的是什么资源呢? 数据资源
    sem_t _dataSem;  

    // 生产者在环形队列中的位置,也就是生产者在队列中的下标
    int _productorStep;
    // 消费者在环形队列中的位置,也就是消费者在队列中的下标
    int _consumerStep;

    // 锁,生产者的锁和消费者进行生产或者消费时需要用到的锁
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

std::string SelfName()
{
    char name[128];
    snprintf(name, sizeof(name), "thread[0x%x]", pthread_self());
    return name;
}

// 生产
void *ProductorRoutine(void *rq)
{
    RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);
    while(true)
    {
        // 构建or获取任务 --- 这个是要花时间的
        int x = rand() % 10;
        int y = rand() % 5;
        // 随机选择操作符
        char op = oper[rand()%oper.size()];

        // 构建生产任务
        Task t(x, y, op, mymath);
        // 生产任务
        ringqueue->Push(t);
        // 输出提示
        std::cout <<  SelfName() << ", 生产者派发了一个任务: " << t.toTaskString() << std::endl;

        sleep(1);
    }
}

// 消费
void *ConsumerRoutine(void *rq)
{
    RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);

    while(true)
    {
        Task t;
        //消费任务
        ringqueue->Pop(&t);
        // 消费也是要花时间的,如果任务很复杂,则需要花费大量的时间
        std::string result = t();
        std::cout <<  SelfName() << ", 消费者消费了一个任务: " << result << std::endl;

        // sleep(1);
    }
}

int main()
{
    // 随机数种子
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x71727374);
    RingQueue<Task> *rq = new RingQueue<Task>();

    // 单生产,单消费,多生产,多消费 
    // --> 只要保证,最终进入临界区的是一个生产,一个消费就行!
    // 多生产,多消费的意义??
    // 不管是生产任务还是消费任务都是需要花费时间的,如果是复杂的任务,则需要花费大量的时间
    // 而生产任务和消费任务是可以多线程并发执行的,只是说放入环形队列时,
    // 多个生产线程或者多个消费线程是串联执行进环形队列的
    pthread_t p[4], c[8];
    for(int i = 0; i < 4; i++) 
        pthread_create(p+i, nullptr, ProductorRoutine, rq);

    for(int i = 0; i < 8; i++) 
        pthread_create(c+i, nullptr, ConsumerRoutine, rq);

    for(int i = 0; i < 4; i++) 
        pthread_join(p[i], nullptr);

    for(int i = 0; i < 8; i++) 
        pthread_join(c[i], nullptr);

    delete rq;
    return 0;
}

线程池

/*threadpool.h*/ 

// 线程池的作用:避免不断的创建新线程,提高效率

/* 线程池: 

\* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 

\* 线程池的应用场景: 

\* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 

\* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。 

\* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误. 

\* 线程池的种类: 

\* 线程池示例: 

\* 1. 创建固定数量线程池,循环从任务队列中获取任务对象, 

\* 2. 获取到任务对象后,执行任务对象中的任务接口 

*/ 

演示代码

makefile

threadpool:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f threadpool

LockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    // 构造函数
    Mutex(pthread_mutex_t *lock_p = nullptr)
    :lock_p_(lock_p)
    {}

    // 加锁
    void lock()
    {
        // 如果lock_p_不是空指针,这说明已经传递进来了一把锁了
        // 此时对调用的线程进行加锁
        if(lock_p_)
            pthread_mutex_lock(lock_p_);   // 进行加锁
    }

    // 解锁
    void unlock()
    {
        if(lock_p_)
             pthread_mutex_unlock(lock_p_);   // 进行解锁
    }

    // 析构函数
    ~Mutex()
    {}

private:
    pthread_mutex_t *lock_p_;
};


class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex)
    :mutex_(mutex)
    {
        mutex_.lock(); // 在构造函数中进行加锁
    }

    // 我们自己封装的锁,一旦出了其作用域,系统就会调用析构函数对调用的线程进行解锁
    ~LockGuard()
    {
        mutex_.unlock(); // 在析构函数中进行解锁
    }

private:
    Mutex mutex_;
};

Task.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstdio>
#include <functional>

class Task
{
    using func_t = std::function<int(int,int,char)>;
    // typedef std::function<int(int,int)> func_t;
public:

    Task()
    {}

    Task(int x, int y, char op, func_t func)
    :_x(x)
    , _y(y)
    , _op(op)
    , _callback(func)
    {}

    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }
    
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        // do nothing
        break;
    }

    return result;
}

Thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>

namespace ThreadNs
{
    typedef std::function<void *(void *)> func_t;
    const int num = 1024;

    class Thread
    {
    private:
        // 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static
        // 类内成员,有缺省参数; 不仅需要传递当前函数的地址,还要传递this指针
        // 而我们只能够传递一个参数(当前函数的地址),因此必须将当前成员函数设置为静态
        static void *start_routine(void *args) 
        {
            Thread *_this = static_cast<Thread *>(args);
            return _this->callback();
        }

    public:

        // 构造函数
        Thread()
        {
            // 给创建的线程命名
            char namebuffer[num];
            snprintf(namebuffer, sizeof namebuffer, "thread-%d", threadnum++);
            name_ = namebuffer;
        }

        // 创建线程
        void start(func_t func, void *args = nullptr)
        {
            func_ = func;  // 线程要调用的方法
            args_ = args;  // args_是func_的参数

            // 使用系统接口来创建线程
            // 由于start_routine是类内成员函数,因此其必须为静态成员函数
            int n = pthread_create(&tid_, nullptr, start_routine, this); 
            assert(n == 0);                                            
            (void)n;
        }

        // 等待回收进程
        void join()
        {
            int n = pthread_join(tid_, nullptr);
            assert(n == 0);
            (void)n;
        }

        // 取调用线程的线程名
        std::string threadname()
        {
            return name_;
        }

        // 析构函数
        ~Thread()
        {
            // do nothing
        }

        // 执行调用线程的计算任务
        void *callback() 
        { 
            return func_(args_);
        }

    private:
        std::string name_;  // 线程名
        func_t func_;       // 线程要执行的方法
        void *args_;        // func_的参数
        pthread_t tid_;     // 线程id

        static int threadnum;  // 线程名的后缀序号
    };
     
    int Thread::threadnum = 1;  // 对静态变量,在全局处进行初始化
} // end namespace ThreadNs

ThreadPool.hpp

#pragma once

#include "Thread.hpp"
#include "LockGuard.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>

using namespace ThreadNs;

const int gnum = 3;


// 在ThreadData用到了ThreadPool,因此需要对其先进行声明
template <class T>
class ThreadPool;

template <class T>
class ThreadData
{
public:
    ThreadPool<T> *threadpool;  // ThreadPool的类指针
    std::string name;           // 调用线程的线程名

public:
    // 构造函数
    ThreadData(ThreadPool<T> *tp, const std::string &n) 
    : threadpool(tp)
    , name(n)
    {}

};


template <class T>
class ThreadPool
{
private:
    // 当线程池中的所有线程都运行起来时,都会执行handlerTask
    // 而handlerTask的作用就是检测任务队列中是否存在消费任务
    // 有,则对消费任务进行计算
    // 没有,则消费线程在对应的条件变量下进行等待
    // handlerTask是静态成员函数,因此这个成员函数是没有this指针的
    // 没有this指针就不可以访问类内非静态成员函数和成员方法,除非单独给这个静态方法传递this指针
    static void *handlerTask(void *args)
    {
        // 参数args是ThreadPool的类对象指针this
        ThreadData<T> *td = (ThreadData<T> *)args;
        while (true)
        {
            T t;
            {
                // 多个消费线程访问任务队列时是互斥的,因此需要进行加锁
                // 使用的锁是我们自己进行封装的锁
                LockGuard lockguard(td->threadpool->mutex());
                while (td->threadpool->isQueueEmpty())
                {
                    // 如果任务队列为空,则线程在对应的条件变量下进行等待
                    td->threadpool->threadWait();
                }

                // pop的本质,是将任务从公共队列中,拿到当前线程自己独立的栈中
                t = td->threadpool->pop(); 
            }

            // t.toTaskString() 返回所要执行的任务
            // t()返回任务计算的结果
            // 不将t()放入LockGuard lockguard(td->threadpool->mutex())的生命周期内
            // 是因为在加锁时,只能有一个消费线程执行计算任务(计算任务,有可能会花费大量的时间)
            // 而出了作用域,就可以有多个消费线程并发执行计算任务
            std::cout << td->name << " 获取了一个任务: " << t.toTaskString() << " 并处理完成,结果是:" << t() << std::endl;
        }

        // 释放ThreadPool的类对象指针this
        delete td;
        return nullptr;
    }

public:
    // 判断任务队列是否为空
    bool isQueueEmpty() { return _task_queue.empty(); }

    // 让线程在对应的条件变量下进行等待
    void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
    
    // 从任务队列中拿出消费任务
    T pop()
    {
        // 拿出消费任务
        T t = _task_queue.front();
        // 从任务队列中将其弹出
        _task_queue.pop();
        return t;
    }

    // 将私有成员变量锁,进行封装成成员函数
    pthread_mutex_t *mutex()
    {
        return &_mutex;
    }

public:
    // 线程池的构造函数
    ThreadPool(const int &num = gnum) 
    : _num(num)
    {
        // 对锁和条件变量进行初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        // 创建新线程,并将新线程放入到vector容器当中
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(new Thread());
        }
    }

    // 启动运行这个线程
    void run()
    {
        for (const auto &t : _threads)
        {
            // 将ThreadPool的this指针和对应线程的线程名封装为一个类作为handlerTask()的参数
            ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
            t->start(handlerTask, td);
            std::cout << t->threadname() << " start ..." << std::endl;
        }
    }

    // 生产线程向任务队列放置任务
    void push(const T &in)
    {
        // 多个生产线程在进入任务队列时,他们之间是互斥的,因此需要进行加锁
        // LockGuard是我们自己封装的锁对象,当其作用域被销毁,其会调用析构函数释放锁
        LockGuard lockguard(&_mutex);
        // 将生产的任务放入到任务队列之中
        _task_queue.push(in);

        // 当任务队列中有任务之后,我们就可以唤醒消费线程来进行消费了
        pthread_cond_signal(&_cond);
    }

    ~ThreadPool()
    {
        // 销毁锁和条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);

        // 将vector容器中的线程对象所占据的空间进行释放
        // 我们自己封装的线程,因此每一个线程对象都需要进行释放
        for (const auto &t : _threads)
            delete t;
    }

private:
    // 线程池中线程的个数
    int _num;  
    // 将创建好的线程放入vector中,vector就是线程池的容器
    std::vector<Thread *> _threads;
    // 生产线程将生产的任务放入到任务队列中,消费线程从队列中拿任务
    std::queue<T> _task_queue;

    // 定义一把锁,作为我们封装的锁对象的参数
    pthread_mutex_t _mutex;

    // 条件变量
    pthread_cond_t _cond;
};

main.cc

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>
#include <unistd.h>

int main()
{
    // 创建线程池,并将其交给智能指针进行管理
    std::unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());
    // 启动线程池中的所有线程
    tp->run();

    int x, y;
    char op;
    while(1)
    {
        std::cout << "请输入数据1# ";
        std::cin >> x;
        std::cout << "请输入数据2# ";
        std::cin >> y;
        std::cout << "请输入你要进行的运算#";
        std::cin >> op;
        Task t(x, y, op, mymath);

        // 将构建的任务放入到任务队列当中
        tp->push(t);
        sleep(1);
    }
}

线程安全的单例模式

  • 什么是单例模式

    • 单例模式是一种 “经典的, 常用的” 设计模式
    • 一个类,只能够创建一个对象,来对其中的数据进行管理
  • 什么是设计模式

针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式

单例模式线程池(懒汉模式)

makefile LockGuard.hpp Task.hpp 同上

ThreadPool.hpp

#pragma once

#include "Thread.hpp"
#include "LockGuard.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>

using namespace ThreadNs;

const int gnum = 3;


// 在ThreadData用到了ThreadPool,因此需要对其先进行声明
template <class T>
class ThreadPool;

template <class T>
class ThreadData
{
public:
    ThreadPool<T> *threadpool;  // ThreadPool的类指针
    std::string name;           // 调用线程的线程名

public:
    // 构造函数
    ThreadData(ThreadPool<T> *tp, const std::string &n) 
    : threadpool(tp)
    , name(n)
    {}

};


template <class T>
class ThreadPool
{
private:
    // 当线程池中的所有线程都运行起来时,都会执行handlerTask
    // 而handlerTask的作用就是检测任务队列中是否存在消费任务
    // 有,则对消费任务进行计算
    // 没有,则消费线程在对应的条件变量下进行等待
    // handlerTask是静态成员函数,因此这个成员函数是没有this指针的
    static void *handlerTask(void *args)
    {
        // 参数args是ThreadPool的类对象指针this
        ThreadData<T> *td = (ThreadData<T> *)args;
        while (true)
        {
            T t;
            {
                // 多个消费线程访问任务队列时是互斥的,因此需要进行加锁
                // 使用的锁是我们自己进行封装的锁
                LockGuard lockguard(td->threadpool->mutex());
                while (td->threadpool->isQueueEmpty())
                {
                    // 如果任务队列为空,则线程在对应的条件变量下进行等待
                    td->threadpool->threadWait();
                }

                // pop的本质,是将任务从公共队列中,拿到当前线程自己独立的栈中
                t = td->threadpool->pop(); 
            }

            // t.toTaskString() 返回所要执行的任务
            // t()返回任务计算的结果
            // 不将t()放入LockGuard lockguard(td->threadpool->mutex())的生命周期内
            // 是因为在加锁时,只能有一个消费线程执行计算任务(计算任务,有可能会花费大量的时间)
            // 而出了作用域,就可以有多个消费线程并发执行计算任务
            std::cout << td->name << " 获取了一个任务: " << t.toTaskString() << " 并处理完成,结果是:" << t() << std::endl;
        }

        // 释放ThreadPool的类对象指针this
        delete td;
        return nullptr;
    }

    // 线程池的构造函数
    ThreadPool(const int &num = gnum) 
    : _num(num)
    {
        // 对锁和条件变量进行初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        // 创建新线程,并将新线程放入到vector容器当中
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(new Thread());
        }
    }

    // 单例模式,只允许有一个对象来对这个类的数据进行管理
    // 因此,需要防止赋值拷贝和拷贝构造
    void operator=(const ThreadPool &) = delete;
    ThreadPool(const ThreadPool &) = delete;

public:
    // 判断任务队列是否为空
    bool isQueueEmpty() { return _task_queue.empty(); }

    // 让线程在对应的条件变量下进行等待
    void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
    
    // 从任务队列中拿出消费任务
    T pop()
    {
        // 拿出消费任务
        T t = _task_queue.front();
        // 从任务队列中将其弹出
        _task_queue.pop();
        return t;
    }

    // 将私有成员变量锁,进行封装成成员函数
    pthread_mutex_t *mutex()
    {
        return &_mutex;
    }


    // 启动运行这个线程
    void run()
    {
        for (const auto &t : _threads)
        {
            // 将ThreadPool的this指针和对应线程的线程名封装为一个类作为handlerTask()的参数
            ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
            t->start(handlerTask, td);
            std::cout << t->threadname() << " start ..." << std::endl;
        }
    }

    // 生产线程向任务队列放置任务
    void push(const T &in)
    {
        // 多个生产线程在进入任务队列时,他们之间是互斥的,因此需要进行加锁
        // LockGuard是我们自己封装的锁对象,当其作用域被销毁,其会调用析构函数释放锁
        LockGuard lockguard(&_mutex);
        // 将生产的任务放入到任务队列之中
        _task_queue.push(in);

        // 当任务队列中有任务之后,我们就可以唤醒消费线程来进行消费了
        pthread_cond_signal(&_cond);
    }

    ~ThreadPool()
    {
        // 销毁锁和条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);

        // 将vector容器中的线程对象所占据的空间进行释放
        // 我们自己封装的线程,因此每一个线程对象都需要进行释放
        for (const auto &t : _threads)
            delete t;
    }

    // getInstance() 获取实例
    // 此处getInstance()需要设置为静态成员函数
    // 如果是非静态成员函数,那么获取的实例即属于这个类又属于对象
    // 我们想要达到的效果是一个实例只属于类(即一个类一个实例)

	// 静态成员函数,不需要创建类对象,指定类域就可以直接进行调用
    // 静态成员函数,在类内进行传递,是不需要传递this指针的(因为静态成员函数是没有this指针的)
    static ThreadPool<T> *getInstance()
    {
        if (nullptr == tp)
        {
            _singlock.lock();
            if (nullptr == tp)
            {
                tp = new ThreadPool<T>();
            }
            _singlock.unlock();
        }
        return tp;
    }

private:
    // 线程池中线程的个数
    int _num;  
    // 将创建好的线程放入vector中,vector就是线程池的容器
    std::vector<Thread *> _threads;
    // 生产线程将生产的任务放入到任务队列中,消费线程从队列中拿任务
    std::queue<T> _task_queue;

    // 定义一把锁,作为我们封装的锁对象的参数
    pthread_mutex_t _mutex;

    // 条件变量
    pthread_cond_t _cond;

    // 线程池的单例,需要设置为静态成员变量,这样这个单例才只属于这个类,和类对象无关
    static ThreadPool<T> *tp;

    // 创建单例时,可能有多个线程都要创建,因此需要对其进行加锁
    static std::mutex _singlock;
};

// 对静态的成员变量做初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;

// 对静态的成员变量做初始化
template <class T>
std::mutex ThreadPool<T>::_singlock;

main.cc

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>
#include <unistd.h>

int main()
{
    // 创建类的实例 ThreadPool<Task>::getInstance()
    ThreadPool<Task>::getInstance()->run();
    
    int x, y;
    char op;
    while(1)
    {
        std::cout << "请输入数据1# ";
        std::cin >> x;
        std::cout << "请输入数据2# ";
        std::cin >> y;
        std::cout << "请输入你要进行的运算#";
        std::cin >> op;
        Task t(x, y, op, mymath);

        // 将构建的任务放入到任务队列当中
        ThreadPool<Task>::getInstance()->push(t);

        sleep(1);
    }
}

tp = new ThreadPool();
}
_singlock.unlock();
}
return tp;
}

private:
// 线程池中线程的个数
int _num;
// 将创建好的线程放入vector中,vector就是线程池的容器
std::vector<Thread *> _threads;
// 生产线程将生产的任务放入到任务队列中,消费线程从队列中拿任务
std::queue _task_queue;

// 定义一把锁,作为我们封装的锁对象的参数
pthread_mutex_t _mutex;

// 条件变量
pthread_cond_t _cond;

// 线程池的单例,需要设置为静态成员变量,这样这个单例才只属于这个类,和类对象无关
static ThreadPool<T> *tp;

// 创建单例时,可能有多个线程都要创建,因此需要对其进行加锁
static std::mutex _singlock;

};

// 对静态的成员变量做初始化
template
ThreadPool *ThreadPool::tp = nullptr;

// 对静态的成员变量做初始化
template
std::mutex ThreadPool::_singlock;


## main.cc

```c
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>
#include <unistd.h>

int main()
{
    // 创建类的实例 ThreadPool<Task>::getInstance()
    ThreadPool<Task>::getInstance()->run();
    
    int x, y;
    char op;
    while(1)
    {
        std::cout << "请输入数据1# ";
        std::cin >> x;
        std::cout << "请输入数据2# ";
        std::cin >> y;
        std::cout << "请输入你要进行的运算#";
        std::cin >> op;
        Task t(x, y, op, mymath);

        // 将构建的任务放入到任务队列当中
        ThreadPool<Task>::getInstance()->push(t);

        sleep(1);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到