Linux 多线程-生产消费者模型&线程池&线程单例模式&其他

发布于:2025-03-23 ⋅ 阅读:(30) ⋅ 点赞:(0)

目录

一、生产消费者模型

1、模型概念

2、基于 BlockingQueue 的生产者消费者模型

3、C++ queue 模拟阻塞队列的生产消费模型【代码】

(1)单纯生产数据1.0版本

(2)加入任务结构体2.0版本

 (3)wait误唤醒

(4)多生产多消费3.0版本

4、POSIX信号量

(1)初始化信号量

(2)销毁信号量

(3)等待信号量

(4)发布信号量 

5、基于环形队列的生产消费模型并入信号量

(1)架构

(2)单生产单消费代码1.0

(3)多生产多消费代码2.0

(4)结论

二、线程池

1、概念

2、线程池的应用场景

3、代码

三、线程安全单例模式

1、单例模式

2、单例模式的特点

3、饿汉实现方式和懒汉实现方式

4、饿汉方式实现单例模式

5、懒汉方式实现单例模式 

6、懒汉方式实现单例模式(线程安全版本)

 四、STL、智能指针和线程安全

1、STL 中的容器是否是线程安全的?

2、智能指针是否是线程安全的?

五、其他常见的各种锁

六、读者写者问题(了解) 

1、读写锁接口

2、伪代码实现


一、生产消费者模型

1、模型概念

如图,在生活中,学生就是消费者角色,工厂是真正的生产者角色,那么超市是什么呢?为什么需要超市?超市是交易场所。我们的家附近不一定有工厂,而且工厂的定位是大规模生产,我们也不可能找工厂生产 5 包方便面,如果工厂也承担了超市的角色,它就不仅要考虑生产的任务,还要考虑并收集消费者的需求,实际对工厂是一种负担,有了超市就可以将生产环节和消费环节进行了解耦(这里的解耦是一种松耦合关系,比如你找工厂生产 5 包方便面,工厂立马生产,预计 1 个小时,此时你正在等待,当工厂生产好后,你拿到了方便面,此时当没有人来消费时,工厂正在等待,这是串行。换而言之,工厂永远是学生来了需求就满足需求,没有需求就等待不生产,工厂和学生互相等待,这就叫做强耦合关系。而超市出现后,工厂就一点也不关心学生要几包方便面了,而只要把方便面生产出来放进超市任务就完成了,学生要方便面的时候直接去超市买就行了。也就是说,就算没有学生消费,工厂也会生产;就算没有工厂生产,学生也会消费;学生也不用在等工厂生产了;工厂也不用在等学生消费了;工厂生产的很快,学生消费的很慢,超市就可以让工厂生产的慢些;学生消费的很快,工厂生产的很慢,超市就可以让工厂生产的快些,此时消费和生产就可以同时进行了,这叫并行,此时学生和工厂就叫做松耦合关系)。


这里方便我们记忆这个模型,我们提出“321”原则(方便记忆)

3种关系:

  1. 生产者 vs 生产者(互斥)
  2. 消费者 vs 消费者(互斥)
  3. 生产者 vs 消费者(互斥&同步)

超市可以被生产者和消费者同时访问,所以超市是一种临界资源。消费者和生产者都可以有多个,所以多个生产者之间一定是竞争关系,多个消费者之间也是竞争关系(假设世界末日当天超市只有一包方便面,你和你的室友都想买,这就是竞争关系,而在计算机中表述竞争关系就是互斥。工厂和学生是同步和互斥的关系(比如学生去超市问售货员买方便面,售货员说卖完了,过了十分钟又去问售货员买方便面,售货员又说卖完了,反反复复,因为没有人主动通知学生,所以学生只能通过轮询检测的方式去检测超市里有没有方便面,这样没错,但是不合理。相反,工厂生产的特别快,去超市补货,因为工厂并不知道超市还有没有空间,所以就反反复复的检测,这样没问题,只是不合理。一定是有了商品让消费者过来,有了空间让生产者过来。所以最合理的状态是工厂供货完,让学生过来消费,学生消费完,让工厂过来供货,这就是同步。当然光有同步还不够,因为有可能出现工厂正在供货,学生就过来消费了,此时工厂可能是供货前 / 中 / 后,所以同前面苹果的例子,可能会出现二义性问题,所以还需要互斥)。

2中角色 :

  1. 生产者
  2. 消费者

一般进行生产和消费的是线程或进程。像一个线程给另一个线程发送任务。

1个交易场所:

  1. 超市

在编码上通常指的是一段内存空间,它是临界资源(它一定是在内存中开辟的,一个线程往其放数据,另一个线程往其拿数据,所以它可能是你自己定义的数组、队列、链表、管道…)。

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

2、基于 BlockingQueue 的生产者消费者模型

BlockingQueue 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

Thread1 是生产者,Thread2 是消费者。如果在生活中来回搬运的是方便面产品,那么在计算机中被进行搬运最多的一定是数据,而这里只有一个生产者和一个消费者,所以不用维护生产者和生产者、消费者和消费者之间的互斥关系。而所谓的阻塞队列,队列是有上限的,当队列不满足生产或者消费条件的时候,对应的线程就应该阻塞,具体在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。简单来说就是 BlockingQueue 空就不让消费者消费,满就不让生产者生产。 (以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时也会被阻塞)。

3、C++ queue 模拟阻塞队列的生产消费模型【代码】

实现条件变量:生产者应该关注 Empty 条件变量,消费者应该关注 Full 条件变量,然后在构造函数内初始化,析构函数内销毁。这里对生产和消费的过程修改:生产过程中若队列满就不应该生产,而应该在 Empty_ 条件变量下等待,而一旦等待就会被挂起阻塞,可是当前是在临界区内部,消费线程不可能去消费,更不可能去唤醒生产线程中等待的线程(pthread_cond_wait 为什么要设计第二个参数互斥锁 mutex),因为它是带着锁挂起阻塞的,那么如果这样执行,程序被会卡住在这,所以 pthread_cond_wait 在设计时就加上了互斥锁,这样阻塞挂起时会自动释放锁,唤醒时会自动获取锁。也就是说pthread_cond_wait会阻塞挂起的同时,然后把锁释放,当你唤醒时,会自动帮你竞争锁,这也就是为什么pthread_cond_wait 第二个参数需要带一把锁的原因。

此时生产过程或消费过程中条件满足就阻塞挂起等待,那么是由谁来唤醒呢?

当然不能是自己唤醒自己,因为你并不知道自己还要不要生产或者消费,而应该是当消费者挂起时,只有生产者才知道有没有数据;当生产者挂起时,只有消费者才知道有没有空间。所以还需要 pthread_cond_signal 来唤醒对方,那么同步最理想的状态就应该是生产完后唤醒消费者,消费完后唤醒生产者。对于 pthread_cond_signal 写在锁内锁外都可以(比如生产者中在锁内唤醒消费者,消费者就从条件变量下醒来 (pthread_cond_wait(&have_data, &lock)),这样它就会直接在锁内等待了,然后会自动释放锁,唤醒时又自动获取锁。同理锁外也是一样)

(1)单纯生产数据1.0版本

代码如下:

Blockqueue.hpp

#include <queue>
#include <pthread.h>

int max = 5;
template <class T>
class BlockQueue
{
private:
    bool IsEmpty()
    {
        return bq_.size() == 0;
    }
    bool IsFull()
    {
        return bq_.size() == capacity_;
    }
public:
    BlockQueue(int capacity = 5):capacity_(capacity)
    {
        pthread_mutex_init(&p_mux,nullptr);
        pthread_mutex_init(&c_mux,nullptr);
        pthread_cond_init(&Empty,nullptr);
        pthread_cond_init(&Full,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&p_mux);
        while(IsFull())  pthread_cond_wait(&Full,&p_mux);
        bq_.push(in);
        pthread_mutex_unlock(&p_mux);
        pthread_cond_signal(&Empty);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&c_mux);
        while(IsEmpty())  pthread_cond_wait(&Empty,&c_mux);
        *out = bq_.front();
        bq_.pop();
        pthread_mutex_unlock(&c_mux);
        pthread_cond_signal(&Full);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&c_mux);
        pthread_mutex_destroy(&p_mux);
        pthread_cond_destroy(&Empty);
        pthread_cond_destroy(&Full);
    }
private:
    std::queue<T> bq_;   //阻塞队列
    int capacity_;  //阻塞队列容量上限
    pthread_mutex_t c_mux;    //消费者的锁
    pthread_mutex_t p_mux;   //生产者的锁
    pthread_cond_t Empty;  //判断阻塞队列是否为空
    pthread_cond_t Full;   //判断阻塞队列是否为满

};

main.cpp

我们让生产者生产一个后休眠一秒,看看消费者会不会在没有商品的时候一直消费

#include <iostream>
#include <unistd.h>
#include "BlockQueue.hpp"

using namespace std;
void* consumer(void* args)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)args;
    while(true)
    {
        int a;
        bq->pop(&a);
        cout << "消费者消费一个数据:" << a << endl;
    }
    return nullptr;
}
void* productor(void* args)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)args;
    int a = 0;
    while(true)
    {
        bq->push(a);
        cout << "生产者生产了一个数据:" << a++ << endl;
        sleep(1);
    }
    return nullptr;

}
int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>();

    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,(void*)bq);
    pthread_create(&p,nullptr,productor,(void*)bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    return 0;
}

运行结果如下:

我们能看到生产者生产一个数据,消费者就消费一个

 

我们再更改一下代码,我们让生产者1秒钟生产一次,消费者2秒钟消费一次

 我们看到生产者发现满了就不再生产了。


(2)加入任务结构体2.0版本

我们加入一个任务结构体,让生产者生产一个任务给消费者,让消费者做数据加工

Task.hpp

#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(){}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

main.cpp

 

#include <iostream>
#include <unistd.h>
#include "BlockQueue.hpp"
#include <ctime>
#include "Task.hpp"

using namespace std;
void *consumer(void *args)
{
    BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
    while (true)
    {
        Task t;
        bq->pop(&t);
        t();
        cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << endl;
        sleep(1);
    }
    return nullptr;
}
void *productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
    while (true)
    {
        int data1 = rand() % 10 + 1; // [1,10]
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);
        bq->push(t);
        cout << "生产者生产了一个任务:" << t.GetTask() << endl;
        sleep(1);
    }
    return nullptr;
}
int main()
{
    srand(time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>();

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    return 0;
}

BlockQueue.hpp代码不变

运行结果如下:

 (3)wait误唤醒

这里大家有没有注意到我们在判断阻塞队列是否为空或者是否为满的时候我们用的是while循环,为什么不用if呢只用判断一次,因为用if会造成wait误唤醒问题。

这里给大家推荐一篇博客 给大家具体的讲解wait误唤醒。

条件变量的虚假唤醒(spurious wakeups)问题 - (!物喜)&(!己悲) - 博客园

(4)多生产多消费3.0版本

只更改了main.cpp的代码

4、POSIX信号量

我们之前给大家介绍过信号量,那是SystemV版本,具体介绍的POSIX信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步。

我们写的阻塞队列实际是共享资源,queue被当作整体,我们增加信号量的概念,可以看成多份,信号量是保持PV操作原子性的计数器。

下面的伪代码,申请信号量资源(sem--,预订资源)叫做 p 操作,释放信号量资源(sem++,释放资源)叫做 v 操作。既然每个进程都得先申请信号量,前提是每个进程都得先看到信号量,此时信号量就是一种临界资源,而系统在设计的时候当然有考虑到,所以 pv 操作其本身就是原子的,所以它就被称为 pv 原语。所以系统也提供了一个信号量 sem_t 类型,我们用的也是 POSIX 下 pthread 库提供的信号量。 

(1)初始化信号量

  • sem:初始化的信号量。
  • pshared:即线程之间共享,还有其它选项,一般设置为 0。
  • value:信号量中计数器的初始值。

(2)销毁信号量

  • sem:释放的信号量。

(3)等待信号量

等待信号量会将信号量的值减 1。

sem:p 操作就相当于对值进行 -- 操作,如果信号量的值 >0,程序就立即返回,如果信号量的值 <=0,sem_wait 将会阻塞,直到值大于 0。

(4)发布信号量 

发布信号量表示资源使用完毕,可以归还资源了,将信号量值加 1。

sem:v 操作就相当于对值进行 ++ 操作。

5、基于环形队列的生产消费模型并入信号量

  • 环形队列采用数组模拟,用模运算来模拟环状特性。
  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空(另外也可以预留一个空的位置作为满的状态)。
  • 现在有了信号量这个计数器,就可以很简单的进行多线程间的同步过程。

front指向填入数据二点下一个位置real消费一个数据,指向下一个位置。我们能看到这张图片,环形队列满了的时候和空的时候,ront和real指向同一个位置。

所以在实现的时候,front指向的下一个位置如果为real,则不再生产数据。

整合原则:

1、两个指针指向同一个位置的时候,只允许一个指针能访问

(1)为空的时候,只能由生产者访问(front)

(2)为满的时候,只能由消费者访问(real)

2、你不能超过我,我不能套圈

在环形队列不空和不满的时候,我们一定在不同的位置,我们可以同时进行访问,但是real不能超过front,在front指向下一个位置为real时,front不能再继续生产了。

所以生产者关注的时环形队列中还有多少的剩余空间。

消费者关注环形队列还有多少数据。

(1)架构

假设环形队列最大容量为10。

我们将定义两个信号量

SpaceSem:初始值为10。

DataSem:初始值为0。

SpaceSem信号量减到0,生产者的P操作挂起。

DataSem信号量减到0,消费者的P操作挂起。

(2)单生产单消费代码1.0

RingQueue.hpp

#include <vector>
#include <pthread.h>
#include <semaphore.h>
using namespace std;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }
    void V(sem_t &s)
    {
        sem_post(&s);
    }
public:
    RingQueue(int capacity = 10)
    :capacity_(capacity)
    ,c_step(0)
    ,p_step(0)
    ,rq_(capacity)
    {
        pthread_mutex_init(&p_mux,nullptr);
        pthread_mutex_init(&c_mux,nullptr);
        sem_init(&SpaceSem,0,capacity);
        sem_init(&DataSem,0,0);
    }
    void push(const T& in)
    {
        P(SpaceSem);
        pthread_mutex_lock(&p_mux);
        rq_[p_step] = in;
        p_step++;
        p_step %= capacity_;
        pthread_mutex_unlock(&p_mux);
        V(DataSem);
    }
    void pop(T* out)
    {
        P(DataSem);
        pthread_mutex_lock(&c_mux);
        *out = rq_[c_step];
        c_step++;
        c_step %= capacity_;
        pthread_mutex_unlock(&c_mux);
        V(SpaceSem);
    }
    ~RingQueue()
    {
        pthread_mutex_destroy(&c_mux);
        pthread_mutex_destroy(&p_mux);
        sem_destroy(&SpaceSem);
        sem_destroy(&DataSem);
    }
private:
    std::vector<T> rq_;   //阻塞队列
    int capacity_;  //阻塞队列容量上限
    pthread_mutex_t c_mux;    //消费者的锁
    pthread_mutex_t p_mux;   //生产者的锁
    sem_t SpaceSem;  //剩余空间信号量
    sem_t DataSem;     //有多少数据信号量

    int c_step; //消费者下标
    int p_step; //生产者下标

};

main.cpp

#include <iostream>
#include <unistd.h>
#include "RingQueue.hpp"
#include <ctime>
#include "Task.hpp"

using namespace std;
void *consumer(void *args)
{
    RingQueue<Task> *rq= (RingQueue<Task> *)args;
    while (true)
    {
        Task t;
        rq->pop(&t);
        t();
        cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << endl;
        sleep(1);
    }
    return nullptr;
}
void *productor(void *args)
{
    int len = opers.size();
    RingQueue<Task> *rq= (RingQueue<Task> *)args;
    while (true)
    {
        int data1 = rand() % 10 + 1; // [1,10]
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);
        rq->push(t);
        cout << "生产者生产了一个任务:" << t.GetTask() << endl;
        sleep(1);
    }
    return nullptr;
}
int main()
{
    RingQueue<Task> *rq = new RingQueue<Task>(10);

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, rq);
    pthread_create(&p, nullptr, productor, rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    return 0;
}

Task.hpp

#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(){}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

运行结果如下:

 

(3)多生产多消费代码2.0

其余代码不变

 

运行结果如下:

 

(4)结论

信号量本质是一把计数器,那计数器的意义是什么?
计数器使用来表征数据的临界资源的。

  • 申请锁 -> 判断与访问 -> 释放锁 --> 本质是我们并不清楚临界资源的情况。
  • 信号量要提前预设资源的情况,而且在 pv 变化过程中,我们能够在外部就能知晓临界资源的情况。
  • 可以不用进入临界区就可以得知资源情况,甚至可以减少临界区内部的判断。

信号量是资源的预订机制 ,它用来表明当前环形队列空间的情况。只要能够成功申请信号量,内部不需要做判断,就一定能够保证可以访问到对应的资源。换而言之,如果条件不满足被挂起,是在加锁之外被挂起的,并不会受影响。

二、线程池

1、概念

线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。

线程池是为了以空间换时间来预先申请一批线程,当有任务到来时可以指派线程。

2、线程池的应用场景

  • 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。
  • 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  • 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

3、代码

PthreadPool.hpp

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <cstring>
#include <unistd.h>
#include "Task.hpp"

struct PthreadInfo
{
    std::string name;
    pthread_t tid;
};
template <class T>
class PthreadPool
{
public:
    bool IsEmpty()
    {
        return Tasks.empty();
    }
    std::string GetThreadName(pthread_t tids)
    {
        for (auto &e : thread_)
        {
            if (e.tid == tids)
            {
                return e.name;
            }
        }
        return nullptr;
    }
    void Lock()
    {
        pthread_mutex_lock(&mux_);
    }
    void Unlock()
    {
        pthread_mutex_unlock(&mux_);
    }
    void Wait()
    {
        pthread_cond_wait(&cond_, &mux_);
    }
public:
    static void *MakeTasks(void *agrs) // 每一个成员函数都会有默认的this
    {
        PthreadPool<T> *tp = (PthreadPool<T> *)agrs;
        std::string name = tp->GetThreadName(pthread_self());
        while (true)
        {
            tp->Lock();
            while(tp->IsEmpty())
            {
                tp->Wait();
            }
            T t = tp->pop();
            tp->Unlock();
            t();
            std::cout << name << " run, "
                      << "result: " << t.GetResult() << std::endl;
        }
    }
    void push(const T &in)
    {
        Lock();
        Tasks.push(in);
        Unlock();
        pthread_cond_signal(&cond_);
        
    }
    T pop()
    {
        T t = Tasks.front();
        Tasks.pop();
        return t;
    }
    void start()
    {
        int num = thread_.size();
        for (int i = 0; i < num; i++)
        {
            thread_[i].name = "pthread-" + std::to_string(i);
            pthread_create(&(thread_[i].tid), nullptr, MakeTasks, this);
        }
    }
    PthreadPool(int num = 10) : thread_(num)
    {
        pthread_mutex_init(&mux_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    ~PthreadPool()
    {
        pthread_cond_destroy(&cond_);
        pthread_mutex_destroy(&mux_);
    }

private:
    
    std::vector<PthreadInfo> thread_;
    std::queue<T> Tasks;
    pthread_mutex_t mux_;
    pthread_cond_t cond_;
};

main.cc

#include "PthreadPool.hpp"
#include <ctime>
using namespace std;

int main()
{
    PthreadPool<Task>* pd = new PthreadPool<Task>(5);
    pd->start();
    srand(time(nullptr) ^ getpid());

    while(1)
    {
        int x = rand() % 10 + 1;
        usleep(10);
        int y = rand() % 5;
        char op = opers[rand()%opers.size()];

        Task t(x, y, op);
        pd->push(t);
        std::cout << "main thread make task: " << t.GetTask() << std::endl;
        sleep(1);
    }
    return 0;
}

运行结果如下:

三、线程安全单例模式

1、单例模式

单例模式是一种 “经典的,常用的,常考的” 设计模式

2、单例模式的特点

某些类只应该具有一个对象(实例),这个对象被保存于一个静态变量中,并提供一个全局访问点来获取这个对象,称之为单例。例如,一个男人只能有一个妻子。

在很多服务器开发场景中,经常需要让服务器加载上百 G 的数据到内存中,此时往往要用一个单例的类来管理这些数据,否则没有单例,就可能出现大量冗余的数据,所以一般单例对象是很大的。单例模式的实现有多种,其中最常见的就是懒汉式和饿汉式。

3、饿汉实现方式和懒汉实现方式

饿汉和懒汉的本质区别就是你这个单例什么时候被创建,也就是什么时候加载单例对象到内存,一般一个单例对象是很大的,是当服务一启动就创建单例,还是等需要的时候再创建单例,这就是饿汉和懒汉存在的意义。

  • 吃完饭后立刻洗碗,这种就是饿汉方式。因为下一顿吃的时候可以立刻拿着碗就能吃饭。
  • 吃完饭后先把碗放下,然后下一顿饭用到这个碗了再洗碗,就是懒汉方式。

最常用的是懒汉,懒汉方式最核心的思想是 “延时加载”,从而能够优化服务器的启动速度。

4、饿汉方式实现单例模式

class Singleton
{
private:
	Singleton(){} //构造函数私有化,防止外部new对象.
	Singleton(const Singleton&) = delete; //禁止拷贝构造
	Singleton& operator = (const Singleton&) = delete; //禁止赋值重载
public:
	static Singleton* GetInstance() //生成唯一的实例对象.
	{
		static Singleton instance; //成功创建单例
		return instance;
	}	
};

5、懒汉方式实现单例模式 

class Singleton
{
private:
	static Singleton* instance; //全局访问点
	Singleton(){} //构造函数私有化,防止外部new对象.
	Singleton(const Singleton&) = delete; //禁止拷贝构造
	Singleton& operator = (const Singleton&) = delete; //禁止赋值重载
public:
	static Singleton* GetInstance() //根据全局访问点,生成唯一的实例对象.
	{
		if(nullptr == instance)
			instance = new Singleton(); //成功创建单例
		return instance;
	}	
};
Singleton* Singleton::instance = nullptr; //定义初始化静态成员变量

6、懒汉方式实现单例模式(线程安全版本)

class Singleton
{
private:
	static Singleton* instance; //全局访问点
	static pthread_mutex_t lock; //互斥锁
	Singleton(){} //构造函数私有化,防止外部new对象.
	Singleton(const Singleton&) = delete; //禁止拷贝构造
	Singleton& operator = (const Singleton&) = delete; //禁止赋值重载
public:
	static Singleton* GetInstance() //根据全局访问点,生成唯一的实例对象.
	{
		if(nullptr == instance)
		{
			pthread_mutex_lock(&lock);
			if(nullptr == instance) //双重if判定,避免不必要的锁竞争.
			{
				instance = new Singleton();//成功创建单例
			}
			pthread_mutex_unlock(&lock);
		}
		return instance;
	}	
};
 
Singleton* Singleton::instance = nullptr; //定义初始化静态成员变量
pthread_t lock = PTHREAD_MUTEX_INITIALIZER; //定义初始化静态成员变量
  1. 加锁解锁的位置。
  2. 双重 if 判定避免不必要的锁竞争。
  3. volatile 关键字防止过度优化

 四、STL、智能指针和线程安全

1、STL 中的容器是否是线程安全的?

不是。

STL 中的容器并不是线程安全的,因为 STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,就会对性能造成巨大的影响,而且对于不同的容器,加锁方式的不同,性能可能也不同,例如 hash 表的锁表和锁桶。因此 STL 默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。

2、智能指针是否是线程安全的?

智能指针的线程安全性因类型而异:

对于 unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全的问题。
对于 shared_ptr,其读操作是线程安全的,即多个线程可以同时读取同一个 shared_ptr 指向的对象,这是安全的。然而,其写操作(修改 shared_ptr 指向)和引用计数的加减操作则是非线程安全的。这就意味着当多个线程尝试写入同一个 shared_pt r 或同时对一个对象的引用计数进行加减操作时,可能会引发竞态条件和数据竞争。因此,在使用智能指针时,需要特别注意并发访问和修改的情况,避免出现线程安全问题。若需在多线程环境下使用,可以考虑使用互斥锁或其他同步机制来保护共享数据。

五、其他常见的各种锁

一般在数据库中会经常的听到这些词,而在 Linux 中就是具体的一把锁,之前学过的二元信号量、互斥量其实都是悲观锁。

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和 CAS 操作(版本号机制就是一个线程对应一个版本,你修改,我也不怕你修改,这样通过版本来区分每一个线程的修改)。
  • CAS 操作: CAS 操作是 JAVA 上面的概念,当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等,则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁,读写锁、公平锁,非公平锁。

自旋锁的核心机制是利用 CPU 的自旋(忙等待)来实现加锁和解锁。当一个线程想要获取已经被其它线程持有的锁时,该线程不会进入阻塞状态,而是持续检测锁是否可用。只有在能够获取到锁的情况下,线程才会停止自旋并继续执行。一旦锁被释放,正在自旋的线程会最优先获得该锁。比如,你打电话叫小丽吃饭,小丽说我正在写作业,你先在楼下等着,此时若小丽只需要 5 分钟就可以把作业写完,那你一定会等,期间可以不断的轮询检测小丽的状态,这就是自旋锁自旋检测的过程。但若小丽需要 1 个小时才能写完作业,你一定不可能等,而是先去打打篮球,直到小丽做完时才打电话叫你去吃饭,这样也是有成本的,本来肚子就饿,对应锁在阻塞时也是有成本的,阻塞时线程要被放在等待队列中,而当条件满足时还需要把这个线程从等待队列放到运行队列中,所以也不是任何场景都适合阻塞的。此时小丽需要花多长时间写完作业才去吃饭,决定了你要不要干等。

什么时候使用自旋锁 / 互斥锁? 

这取决于资源就绪的时间问题。换而言之,就是当多个线程竞争同一份资源时,看竞争成功的这个线程在临界区会待多长时间。如果时间长可以采用挂起等待锁,时间短可以采用自旋锁。

所以锁是要阻塞还是要自旋取决于持有锁的线程在临界区中执行的时长,而究竟是使用阻塞还是自旋,是取决于用户的,因为只有用户知道他自己写的代码,像之前的抢票,采用自旋会更适合。所以自旋锁适用于短时间内的资源互斥,可以避免线程阻塞和消耗,从而具有较高的效率。然而锁被长时间持有或者资源冲突频繁,自旋锁可能会导致 CPU 资源的浪费,甚至可能引发系统崩溃的风险。因此,自旋锁通常用于锁持有时间较短的场景。

 

可以看到这几个接口和上面所认识的接口很类似。实际当我们在调用 spin 自旋锁时,如果没有申请到,虽然它在自旋,但在调用者看来还是在阻塞的现象。

六、读者写者问题(了解) 

在编写多线程的时候,有一种情况是十分常见的。有些公共数据修改的机会比较少,相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率,所以读写锁就可以专门处理这种多读少写的情况。

那么是否有一种方法可以专门处理这种多读少写的情况呢?
有,那就是读写锁。

就比如我们小时候教室里做黑板报一样,写黑板报的人可能就一个小组,读黑板报的人就是一个班。有人在写黑板报的时候,无法观看。

通常写者有一个特征是读者特别多,写者特别少。在生活中的黑板报,它也符合 “321” 原则:

  • 三种关系:读者和读者(共享)、写者和写者(互斥)、读者和写者(互斥与同步)。

相比之前的生产者消费者模型,这里很意外的是读者和读者是没有关系,因为在写者出完黑板报时,不可能说只能你一个读者看,其它读者都闭上眼睛,想看的后面排队去,其实本质是因为读者和读者不会把数据取走,而消费者和消费者会把数据取走,这也就是这两种模型的区别,所以读者和读者没有互斥和同步关系。

  • 两种角色:读者和写者。
  • 一个交易场所:黑板。

1、读写锁接口

A. 设置读写优先

int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/

B. 初始化  

C. 销毁

 D. 加锁和解锁 

2、伪代码实现

首先读者可能有多个,实际需要有一个 reader_count 计数器,而写者之间是互斥关系,所以不需要计数器,多了一个读者就 ++,然后访问临界资源,少了一个读者就 --,++ -- 都需要锁来保护。写者中加锁判断是否有读者,如果有就解锁,然后 goto 继续加锁判断是否有读者,如果没有读者就访问临界资源,此时并没有解锁,访问结束后再解锁。这样就维护了写者与写者、读者与写者之间的关系。

上面说读者很多,写者很少,这是根据场景的,这就意味着读者长时间在读,写者可能会写饥饿了,相反写者长时间在写,读者也可能读饥饿了。实际读写锁在处理这种问题时,有两种策略:读者优先和写者优先。

上面在谈生产者消费者模型时并没有提及谁优先的原因是生产者和消费者它们的地位是对等的、信赖的,就是说生产者把缓冲区生产满了就不能生产了,必须依靠消费者,同样,消费者把缓冲区消费完了就不能再消费了,必须依靠生产者。生产者消费者模型没有优先的问题本质上是因为不管是谁优先,节奏一定会被对方拖慢,所以一定是保证生产和消费的节奏一致,效率才是最高的,所以不谈优先。而读者写者模型就不一样了,读者和写者之间没有像生产者消费者之间那样太大的信赖性,读者和写者本身是互斥的,那么你想让读者读到老的数据还是新的数据,比如写者写好文章后,文章中有错误,然后想让写者赶快更新文章,让读者看到正确的文章,这叫做写者优先。当然也有一些场景想让读者看到老的数据,比如公司中一些新代码不太稳定,所以想让读者先读老的数据,这叫做读者优先。所以读者优先是指读者和写者一起到来时优先让读者申请到锁,注意,“一起到来” 很重要,因为读者早就比写者到来了,你不可能让写者还优先,一定要明白的是优先的本质是不是总是让你先,而是在某一件事情上有冲突,就像常说的女士优先一定是男士和女士在某一件事情上有冲突;而写者优先是指当写者到来的时候,后续读者就暂时不能进入临界资源进行读取了,所有正在读取的线程执行完毕,写者再进入。 


网站公告

今日签到

点亮在社区的每一天
去签到