Linux之线程同步与互斥

发布于:2025-06-21 ⋅ 阅读:(15) ⋅ 点赞:(0)

目录

一、线程互斥

1.1、进程线程间的互斥相关背景概念

1.2、互斥量mutex

1.2.1、互斥量的接⼝

1.3、互斥量实现原理探究

1.4、互斥量的封装

二、线程同步

2.1、条件变量

2.2、同步概念与竞态条件

2.3、条件变量函数

2.4、⽣产者消费者模型

2.4.1、为何要使⽤⽣产者消费者模型

2.4.2、⽣产者消费者模型优点

2.5、基于BlockingQueue的⽣产者消费者模型

2.5.1、BlockingQueue

2.5.2、C++ queue模拟阻塞队列的⽣产消费模型

2.6、为什么 pthread_cond_wait 需要互斥量?

2.7、条件变量使⽤规范

2.8、条件变量的封装

2.9、POSIX信号量

2.9.1、基于环形队列的⽣产消费模型

三、线程池

3.1、⽇志与策略模式

3.2、线程池设计

3.3、线程安全的单例模式

3.3.1、什么是单例模式

3.3.2、饿汉实现⽅式和懒汉实现⽅式的理解

3.3.3、饿汉⽅式实现单例模式

3.3.4、懒汉⽅式实现单例模式

3.3.6、懒汉⽅式实现单例模式(线程安全版本)

3.4、单例式线程池

四、线程安全和重入问题

五、常见锁概念

5.1、死锁

5.2、死锁四个必要条件

5.3、避免死锁

六、STL,智能指针和线程安全

6.1、STL中的容器是否是线程安全的?

6.2、智能指针是否是线程安全的?


一、线程互斥

1.1、进程线程间的互斥相关背景概念

  • 临界资源:多线程执⾏流共享的资源就叫做临界资源。
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
  • 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起保护作⽤。
  • 原⼦性(后⾯讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成, 要么未完成。

1.2、互斥量mutex

  • ⼤部分情况,线程使⽤的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量 归属单个线程,其他线程⽆法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完 成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来⼀些问题。

售票系统:

ticket.cc:

#include <iostream>
#include <vector>
#include "Thread.hpp"

using namespace ThreadModule;

#define NUM 4

int ticketnum = 10000;

void Ticket()
{
    while(true)
    {
        if(ticketnum > 0)
        {
            usleep(1000);
            printf("get a new ticket,id: %d\n", ticketnum--);
        }
        else
        {
            break;
        }
    }
}

int main()
{
    //构建线程对象
    std::vector<Thread> threads;
    for(int i = 0; i < NUM; i++)
    {
        threads.emplace_back(Ticket);
    }

    //启动线程
    for(auto &thread : threads)
    {
        thread.Start();
    }

    //等待线程
    for(auto &thread : threads)
    {
        thread.Join();
    }

    return 0;
}

pthread.hpp:

#ifndef _THREAD_HPP__
#define _THREAD_HPP__

#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <unistd.h>

namespace ThreadModule
{
    using func_t = std::function<void()>;
    static int number = 1;
    enum class TSTATUS
    {
        NEW,
        RUNNING,
        STOP
    };

    class Thread
    {
    private:
        // 成员方法!
        static void *Routine(void *args)
        {
            Thread *t = static_cast<Thread *>(args);
            t->_status = TSTATUS::RUNNING;
            t->_func();
            return nullptr;
        }
        void EnableDetach() { _joinable = false; }

    public:
        Thread(func_t func) : _func(func), _status(TSTATUS::NEW), _joinable(true)
        {
            _name = "Thread-" + std::to_string(number++);
            _pid = getpid();
        }
        bool Start()
        {
            if (_status != TSTATUS::RUNNING)
            {
                int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
                if (n != 0)
                    return false;
                return true;
            }
            return false;
        }
        bool Stop()
        {
            if (_status == TSTATUS::RUNNING)
            {
                int n = ::pthread_cancel(_tid);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }
        bool Join()
        {
            if (_joinable)
            {
                int n = ::pthread_join(_tid, nullptr);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }
        void Detach()
        {
            EnableDetach();
            pthread_detach(_tid);
        }
        bool IsJoinable() { return _joinable; }
        std::string Name() {return _name;}
        ~Thread()
        {
        }

    private:
        std::string _name;
        pthread_t _tid;
        pid_t _pid;
        bool _joinable; // 是否是分离的,默认不是
        func_t _func;
        TSTATUS _status;
    };
}

#endif

Makefile:

bin=ticket
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)

$(bin):$(obj)
	$(cc) -o $@ $^ -lpthread
%.o:%.cc
	$(cc) -c $< -std=c++17

.PHONY:clean
clean:
	rm -f $(bin) $(obj)

.PHONY:test
test:
	echo $(src)
	echo $(obj)

效果:

为什么可能⽆法获得正确结果?

  • --ticket 操作本⾝就不是⼀个原⼦操作。
  • if 语句不是原子操作。
  • 线程间的调度切换执行。
    • 线程或者进程什么时候会切换?
      • 时间片耗尽
      • 更高优先级的进程要调度
      • 通过sleep,然后从内核返回用户时,会进行时间片是否到达的检测,进而导致切换
  • usleep 这个模拟漫⻓业务的过程,在这个漫⻓的业务过程中,可能有很多个线程会进⼊该代码段。

具体解释:假设当前票数只剩一个,出现两个或以上线程执行以下操作,将1(剩余票数)放到寄存器后,进行 if 的条件判断之前,线程切换了,当这些线程再次被调度时,恢复进程上下文信息,因为切换前寄存器中是1,恢复后寄存器的内容就还是1,所以恢复执行后的这些线程都会认为还有一张票,if 判断为真,都会执行抢票工作,但实际上票数可以已经没了或者只剩一张,但多个线程执行了抢票代码,所以票数出现负数问题。

取出ticket--部分的汇编代码

objdump   -d    a.out   >      test.objdump

152      40064b:       8b 05 e3 04 20 00       mov       0x2004e3(%rip),%eax #

600b34 <ticket>

153     400651:        83 e8 01                      sub        $0x1,%eax

154    400654:         89 05 da 04 20 00       mov       %eax,0x2004da(%rip) #

600b34 <ticket>

--(减减) 操作并不是原⼦操作,⽽是对应三条汇编指令:

  • load :将共享变量ticket从内存加载到寄存器中。
  • update :更新寄存器⾥⾯的值,执⾏-1操作。
  • store :将新值,从寄存器写回共享变量ticket的内存地址。

要解决以上问题,需要做到三点:

  • 代码必须要有互斥⾏为:当代码进⼊临界区执⾏时,不允许其他线程进⼊该临界区。
  • 如果多个线程同时要求执⾏临界区的代码,并且临界区没有线程在执⾏,那么只能允许⼀个线程进⼊该临界区。
  • 如果线程不在临界区中执⾏,那么该线程不能阻⽌其他线程进⼊临界区。

要做到这三点,本质上就是需要⼀把锁。Linux上提供的这把锁叫互斥量。

1.2.1、互斥量的接⼝

初始化互斥量有两种⽅法:

⽅法1,静态分配:(静态和全局的锁一般使用这种方式)

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

⽅法2,动态分配:(局部的锁一般使用这种方式)

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const

pthread_mutexattr_t *restrict attr);

    参数:

            mutex:要初始化的互斥量

            attr:NULL

销毁互斥量需要注意:

  • 使⽤ PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。
  • 不要销毁⼀个已经加锁的互斥量。
  • 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁。

int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁:

int pthread_mutex_lock(pthread_mutex_t *mutex);

int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值:成功返回0,失败返回错误号

调⽤ pthread_ lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
  • 发起函数调⽤时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_lock调⽤会陷⼊阻塞(执⾏流被挂起),等待互斥量解锁。

注意:

  • 锁本身也只是一种共享资源,不过加锁和解锁操作是原子的。
  • 如果申请锁的时候,锁已经被别人拿走了,其他线程要进行阻塞等待。
  • 线程在加锁情况下访问临界区代码的时候,是可以被切换走的。只不过锁还没有释放,其他线程仍无法进入临界区。

改进的售票系统:

全局锁:(makefile,Thread.hpp同上,只需要修改ticket.cc)ticket.cc如下:

#include <iostream>
#include <vector>
#include "Thread.hpp"

using namespace ThreadModule;

#define NUM 4

//创建锁  加锁和解锁操作是原子的
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

int ticketnum = 10000;//共享资源

void Ticket()
{
    while(true)
    {
        //加锁
        pthread_mutex_lock(&lock);
        if(ticketnum > 0)
        {
            usleep(1000);
            printf("get a new ticket,id: %d\n", ticketnum--);
            
            //解锁
            pthread_mutex_unlock(&lock);
        }
        else
        {
            pthread_mutex_unlock(&lock); //解锁
            break;
        }
    }
}

int main()
{
    //构建线程对象
    std::vector<Thread> threads;
    for(int i = 0; i < NUM; i++)
    {
        threads.emplace_back(Ticket);
    }

    //启动线程
    for(auto &thread : threads)
    {
        thread.Start();
    }

    //等待线程
    for(auto &thread : threads)
    {
        thread.Join();
    }

    return 0;
}

局部锁:(makefile同上)

Thread.hpp:(增加了模版,使其可以为线程执行的方法传递参数)

#ifndef _THREAD_HPP__
#define _THREAD_HPP__

#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <unistd.h>

// v2
namespace ThreadModule
{
    static int number = 1;
    enum class TSTATUS
    {
        NEW,
        RUNNING,
        STOP
    };

    template <typename T>
    class Thread
    {
        using func_t = std::function<void(T&)>;
    private:
        // 成员方法!
        static void *Routine(void *args)
        {
            Thread<T> *t = static_cast<Thread<T> *>(args);
            t->_status = TSTATUS::RUNNING;
            t->_func(t->_data);
            return nullptr;
        }
        void EnableDetach() { _joinable = false; }

    public:
        Thread(func_t func, T &data) : _func(func), _data(data), _status(TSTATUS::NEW), _joinable(true)
        {
            _name = "Thread-" + std::to_string(number++);
            _pid = getpid();
        }
        bool Start()
        {
            if (_status != TSTATUS::RUNNING)
            {
                int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
                if (n != 0)
                    return false;
                return true;
            }
            return false;
        }
        bool Stop()
        {
            if (_status == TSTATUS::RUNNING)
            {
                int n = ::pthread_cancel(_tid);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }
        bool Join()
        {
            if (_joinable)
            {
                int n = ::pthread_join(_tid, nullptr);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }
        void Detach()
        {
            EnableDetach();
            pthread_detach(_tid);
        }
        bool IsJoinable() { return _joinable; }
        std::string Name() { return _name; }
        ~Thread()
        {
        }

    private:
        std::string _name;
        pthread_t _tid;
        pid_t _pid;
        bool _joinable; // 是否是分离的,默认不是
        func_t _func;
        TSTATUS _status;
        T &_data;
    };
}
#endif

ticket.cc:

#include <iostream>
#include <vector>
#include "Thread.hpp"

using namespace ThreadModule;

#define NUM 4

int ticketnum = 10000;//共享资源

class ThreadData
{
public:
    std::string name;
    pthread_mutex_t *lock_ptr;
};

void Ticket(ThreadData &td)
{
    while(true)
    {
        //加锁
        pthread_mutex_lock(td.lock_ptr);
        if(ticketnum > 0)
        {
            usleep(1000);
            printf("get a new ticket,who get it: %s,id: %d\n", td.name.c_str(), ticketnum--);
            
            //解锁
            pthread_mutex_unlock(td.lock_ptr);
        }
        else
        {
            pthread_mutex_unlock(td.lock_ptr);
            break;
        }
    }
}

int main()
{
    pthread_mutex_t lock;  //创建锁
    pthread_mutex_init(&lock, nullptr); //初始化锁

    //构建线程对象
    std::vector<Thread<ThreadData>> threads;
    for(int i = 0; i < NUM; i++)
    {
        ThreadData *td = new ThreadData();
        td->lock_ptr = &lock;
        threads.emplace_back(Ticket, *td);
        td->name = threads.back().Name();
    }

    //启动线程
    for(auto &thread : threads)
    {
        thread.Start();
    }

    //等待线程
    for(auto &thread : threads)
    {
        thread.Join();
    }

    pthread_mutex_destroy(&lock); //销毁锁

    return 0;
}

1.3、互斥量实现原理探究

  • 经过上⾯的例⼦,⼤家已经意识到单纯的 i++ 或者 ++i 都不是原⼦的,有可能会有数据⼀致性问题。
  • 为了实现互斥锁操作,⼤多数体系结构都提供了swap或exchange指令,该指令的作⽤是把寄存器和 内存单元的数据相交换,由于只有⼀条指令,保证了原⼦性,即使是多处理器平台,访问内存的总线周 期也有先后,⼀个处理器上的交换指令执⾏时另⼀个处理器的交换指令只能等待总线周期。现在 我们把lock和unlock的伪代码改⼀下。

1.4、互斥量的封装

Mutex.hpp:

#pragma once

#include <iostream>
#include <pthread.h>

namespace LockMoudle
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        const Mutex& operator=(const Mutex&) = delete;

        Mutex()
        {
            int n = pthread_mutex_init(&lock, nullptr);
            //下面可以对n进行判断,判断是否初始化成功
            //这里就不判断了,直接强转一下,防止报warning
            (void)n;
        }

        ~Mutex()
        {
            int n = pthread_mutex_destroy(&lock);
            (void)n; //和上面同理
        }

        void Lock()
        {
            int n = pthread_mutex_lock(&lock);
            (void)n;
        }

        void Unlock()
        {
            int n = pthread_mutex_unlock(&lock);
            (void)n;
        }

    private:
        pthread_mutex_t lock;
    };

    //使用时可以创建局部的LockGuard对象
    //这样创建时,通过构造方法自动加锁
    //因为是局部的,出作用域自动销毁,通过析构函数释放锁
    //这样我们就只需要创建LockGuard对象,申请和释放锁都不需要我们管
    //这种代码风格叫做RAII风格
    class LockGuard
    {
    public:
        LockGuard(Mutex &mtx)   
            :_mtx(mtx)
        {
            _mtx.Lock();
        }

        ~LockGuard()
        {
            _mtx.Unlock();
        }
    private:
        Mutex &_mtx;
    };
}

二、线程同步

2.1、条件变量

  • 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列 中。这种情况就需要⽤到条件变量。

2.2、同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步。
  • 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解。

2.3、条件变量函数

初始化:

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

参数:

        cond:要初始化的条件变量

        attr:NULL

销毁:

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满⾜:

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:

        cond:要在这个条件变量上等待

        mutex:互斥量,后⾯详细解释

注意:使用该方法,线程等待时会自动释放拥有的锁,被唤醒时会自动申请锁。

唤醒等待:

唤醒某条件变量下所有等待线程

int pthread_cond_broadcast(pthread_cond_t *cond);

唤醒某条件变量下等待的第一个线程

int pthread_cond_signal(pthread_cond_t *cond);

简单案例:

  • 我们先使⽤PTHREAD_COND/MUTEX_INITIALIZER进⾏测试,对其他细节暂不追究。
#include <iostream>
#include <cstdio>
#include <string>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *active(void *args)
{
    std::string name = static_cast<const char *>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        //等待时会先自动释放锁再去等待
        //被唤醒时会自动拥有锁
        pthread_cond_wait(&cond, &mutex);
        printf("%s is active\n", name.c_str());
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t tid1, tid2, tid3;
    pthread_create(&tid1, nullptr, active, (void *)"thread-1");
    pthread_create(&tid2, nullptr, active, (void *)"thread-1");
    pthread_create(&tid3, nullptr, active, (void *)"thread-1");

    sleep(1);
    printf("Main thread ctrl begin...\n");
    while(true)
    {
        printf("main wakeup thread...\n");
        pthread_cond_signal(&cond);
        // pthread_cond_broadcast(&cond);
        sleep(1);
    }

    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);

    return 0;
}

2.4、⽣产者消费者模型

2.4.1、为何要使⽤⽣产者消费者模型

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区, 平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。

2.4.2、⽣产者消费者模型优点

  • 解耦
  • ⽀持并发
  • ⽀持忙闲不均

2.5、基于BlockingQueue的⽣产者消费者模型

2.5.1、BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2.5.2、C++ queue模拟阻塞队列的⽣产消费模型

示例代码:(单生产者,单消费者)

BlockQueue:

#pragma once

#include <iostream>
#include<queue>
#include <pthread.h>

namespace BlockQueueModule
{
    static const int gcap = 10;

    template<typename T>
    class BlockQueue
    {
    private:
        bool IsFull(){ return _q.size() == _cap; }
        bool IsEmpty(){ return _q.empty(); }
    public:
        BlockQueue(int cap = gcap)
            :_cap(cap),
             _cwait_num(0),
             _pwait_num(0)
        {
            pthread_mutex_init(&_mutex, nullptr);
            pthread_cond_init(&_productor_cond, nullptr);
            pthread_cond_init(&_consumer_cond, nullptr);
        }

        void Equeue(const T& in) //生产者
        {
            pthread_mutex_lock(&_mutex);
            //生产数据是有条件的,需要先判断是否为满,满了就不能放数据了
            while(IsFull()) //用while对条件进行判断,防止伪唤醒,这样唤醒后会多判断一次是否满足条件,如果不满足就继续等待
            {
                std::cout << "生产者进入等待..." << std::endl; //打印信息,方便调试

                _pwait_num++;
                pthread_cond_wait(&_productor_cond, &_mutex); //wait时会先释放锁
                _pwait_num--;
                //线程真正被唤醒需要重新申请并持有锁(他会在临界区内醒来)

                std::cout << "生产者被唤醒..." << std::endl;
            }   
            //队列没满或者线程被唤醒了
            _q.push(in); //生产
            
            //当前一定有数据
            //判断是否有消费者等待,如果有就唤醒一个
            if(_cwait_num)
            {
                std::cout << "叫醒消费者" << std::endl;
                pthread_cond_signal(&_consumer_cond);
            }
            //解锁
            pthread_mutex_unlock(&_mutex);
        }

        void Pop(T *out) //消费者
        {
            pthread_mutex_lock(&_mutex);

            while(IsEmpty())
            {
                std::cout << "消费者进入等待..." << std::endl;

                _cwait_num++;
                pthread_cond_wait(&_consumer_cond, &_mutex);
                _cwait_num--;
                std::cout << "消费者被唤醒..." << std::endl;
            }

            *out = _q.front();
            _q.pop();

            if(_pwait_num)
            {
                std::cout << "叫醒生产者..." << std::endl;
                pthread_cond_signal(&_productor_cond);
            }

            pthread_mutex_unlock(&_mutex);
        }

        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_productor_cond);
            pthread_cond_destroy(&_consumer_cond);
        }
    private:
        std::queue<T> _q;   //保存数据的容器,临界资源
        int _cap;           //容器的最大容量
        pthread_mutex_t _mutex;  //互斥锁
        pthread_cond_t _productor_cond; //生产者条件变量
        pthread_cond_t _consumer_cond;  //消费者条件变量

        int _cwait_num;     //消费者等待数量
        int _pwait_num;     //生产者等待数量
    };
}

Main.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

using namespace BlockQueueModule;

void *Consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data;
        // 1. 从bq拿到数据
        bq->Pop(&data);
        
        // 2.做处理
        printf("Consumer, 消费了一个数据: %d\n", data);
    }
}

void *Productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    int data = 10;
    while (true)
    {
        sleep(2);

        // 2. 生产到bq中
        bq->Equeue(data);

        printf("producter 生产了一个数据: %d\n", data);
        data++;
    }
}

int main()
{
    // 交易场所,不仅仅可以用来进行传递数据
    BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源
    // 单生产,单消费
    pthread_t c1, p1; //,c2, , p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    // pthread_create(&c2, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);
    // pthread_create(&p2, nullptr, Productor, bq);
    // pthread_create(&p3, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    // pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    // pthread_join(p2, nullptr);
    // pthread_join(p3, nullptr);

    delete bq;

    return 0;
}

2.6、为什么 pthread_cond_wait 需要互斥量?

  • 条件等待是线程间同步的⼀种⼿段,如果只有⼀个线程,条件不满⾜,⼀直等下去都不会满⾜, 所以必须要有⼀个线程通过某些操作,改变共享变量,使原先不满⾜的条件变得满⾜,并且友好 的通知等待在条件变量上的线程。
  • 条件不会⽆缘⽆故的突然变得满⾜了,必然会牵扯到共享数据的变化。所以⼀定要⽤互斥锁来保 护。没有互斥锁就⽆法安全的获取和修改共享数据。

按照上⾯的说法,我们设计出如下的代码:先上锁,发现条件不满⾜,解锁,然后等待在条件变 量上不就⾏了,如下代码:

// 错误的设计

pthread_mutex_lock(&mutex);

while (condition_is_false)

{

        pthread_mutex_unlock(&mutex);

         pthread_cond_wait(&cond,&mutex);

        pthread_mutex_lock(&mutex);

        //等待这里不需要自己释放锁和申请锁,它会自动释放和申请

}

pthread_mutex_unlock(&mutex);

  • 由于解锁和等待不是原⼦操作。调⽤解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满⾜,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是⼀个原⼦操作。
  • int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进⼊该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_wait返 回,把条件量改成1,把互斥量恢复成原样。

2.7、条件变量使⽤规范

等待条件代码:

pthread_mutex_lock(&mutex);

while (条件为假)

        pthread_cond_wait(cond, mutex);

修改条件

pthread_mutex_unlock(&mutex);

给条件发送信号代码:

pthread_mutex_lock(&mutex);

设置条件为真

pthread_cond_signal(cond);

pthread_mutex_unlock(&mutex);

2.8、条件变量的封装

Cond.hpp:

#pragma once

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

namespace CondModule
{
    using namespace LockModule;
    
    class Cond
    {
    public:
        Cond()
        {
            int n = ::pthread_cond_init(&_cond, nullptr);
            (void)n;
        }

        void Wait(Mutex &mutex) 
        {
            int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
            (void)n;
        }

        void Notify()
        {
            int n = ::pthread_cond_signal(&_cond);
            (void)n;
        }

        void NotifyAll()
        {
            int n = ::pthread_cond_broadcast(&_cond);
            (void)n;
        }

        ~Cond()
        {
            int n = ::pthread_cond_destroy(&_cond);
        }
    private:
        pthread_cond_t _cond;
    };
}

Mutex.hpp:

#pragma once
#include <iostream>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        const Mutex& operator = (const Mutex&) = delete;

        Mutex()
        {
            int n = ::pthread_mutex_init(&_lock, nullptr);
            (void)n;
        }

        ~Mutex()
        {
            int n = ::pthread_mutex_destroy(&_lock);
            (void)n;
        }

        void Lock()
        {
            int n = ::pthread_mutex_lock(&_lock);
            (void)n;
        }

        pthread_mutex_t *LockPtr()
        {
            return &_lock;
        }
        
        void Unlock()
        {
            int n = ::pthread_mutex_unlock(&_lock);
            (void)n;
        }

    private:
        pthread_mutex_t _lock;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mtx):_mtx(mtx)
        {
            _mtx.Lock();
        }
        ~LockGuard()
        {
            _mtx.Unlock();
        }
    private:
        Mutex &_mtx;
    };
}

注意:为了让条件变量更具有通⽤性,建议封装的时候,不要在Cond类内部引⽤对应的封装互斥 量,要不然后⾯组合的时候,会因为代码耦合的问题难以初始化,因为⼀般⽽⾔Mutex和 Cond基本是⼀起创建的。

2.9、POSIX信号量

POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但 POSIX可以⽤于线程间同步。

初始化信号量:

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

        pshared:0表⽰线程间共享,⾮零表⽰进程间共享

        value:信号量初始值

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1

int sem_wait(sem_t *sem);   //P()

发布信号量:

功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。

int sem_post(sem_t *sem);  //V()

2.9.1、基于环形队列的⽣产消费模型

  • 环形队列采⽤数组模拟,⽤模运算来模拟环状特性

  • 环形结构起始状态和结束状态都是⼀样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态

但是我们现在有信号量这个计数器,就很简单的进⾏多线程间的同步过程。示例代码:

Main.cc:

#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

using namespace RingBufferModule;

void *Consumer(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    while(true)
    {
        sleep(1);
        // sleep(1);
        // 1. 消费数据
        int data;
        ring_buffer->Pop(&data);

        // 2. 处理:花时间
        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void *Productor(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    int data = 0;
    while (true)
    {
        // 1. 获取数据:花时间
        // sleep(1);

        // 2. 生产数据
        ring_buffer->Equeue(data);
        std::cout << "生产了一个数据: " << data << std::endl;
        data++;
    }
}

int main()
{
    RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源
    // 单生产,单消费
    pthread_t c1, p1, c2,c3,p2;
    pthread_create(&c1, nullptr, Consumer, ring_buffer);
    pthread_create(&c2, nullptr, Consumer, ring_buffer);
    pthread_create(&c3, nullptr, Consumer, ring_buffer);
    pthread_create(&p1, nullptr, Productor, ring_buffer);
    pthread_create(&p2, nullptr, Productor, ring_buffer);


    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);


    delete ring_buffer;

    return 0;
}

RingBuffer.hpp:

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>

#include "Sem.hpp"
#include "Mutex.hpp"

namespace RingBufferModule
{
    using namespace SemModule;
    using namespace LockModule;

    template<typename T>
    class RingBuffer
    {
    public:
        RingBuffer(int cap)
            :_ring(cap),
             _cap(cap),
             _p_step(0),
             _c_step(0),
             _datasem(0),
             _spacesem(cap)
        {}

        //生产者
        void Equeue(const T& in)
        {   
            _spacesem.P();

            {
                LockGuard(_p_lock);
                _ring[_p_step] = in;
                _p_step++;
                _p_setp %= _cap;
            }

            _datasem.V();
        }

        //消费者
        void Pop(T *out)
        {
            _datasem.P();
            
            {
                LockGuard(_c_lock);
                *out = _ring[_c_step];
                _c_step++;
                _c_step %= _cap;
            }

            _spacesem.V();
        }

        ~RingBuffer()
        {}
    private:
        std::vector<T> _ring;  //用数组实现环形队列  临界资源
        int _cap;       //总容量
        int _p_step;    //生产者位置
        int _c_step;    //消费者位置

        Sem _datasem;   //数据信号量
        Sem _spacesem;  //空间信号量

        Mutex _p_lock;
        Mutex _c_lock;
    };
}

Mutex.hpp:

#pragma once
#include <iostream>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        const Mutex& operator = (const Mutex&) = delete;
        Mutex()
        {
            int n = ::pthread_mutex_init(&_lock, nullptr);
            (void)n;
        }
        ~Mutex()
        {
            int n = ::pthread_mutex_destroy(&_lock);
            (void)n;
        }
        void Lock()
        {
            int n = ::pthread_mutex_lock(&_lock);
            (void)n;
        }
        pthread_mutex_t *LockPtr()
        {
            return &_lock;
        }
        void Unlock()
        {
            int n = ::pthread_mutex_unlock(&_lock);
            (void)n;
        }

    private:
        pthread_mutex_t _lock;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mtx):_mtx(mtx)
        {
            _mtx.Lock();
        }
        ~LockGuard()
        {
            _mtx.Unlock();
        }
    private:
        Mutex &_mtx;
    };
}

Sem.hpp:

#pragma once

#include <semaphore.h>

namespace SemModule
{
    int defaultsemval = 1;
    class Sem
    {
    public:
        Sem(int value = defaultsemval)
            :_init_value(value)
        {
            int n = ::sem_init(&_sem, 0, _init_value);
            (void)n;
        }

        void P()
        {
            int n = ::sem_wait(&_sem);
            (void)n;
        }

        void V()
        {
            int n = ::sem_post(&_sem);
            (void)n;
        }

        ~Sem()
        {
            int n = ::sem_destroy(&_sem);
            (void)n;
        }
    private:
        sem_t _sem;
        int _init_value;
    };
}

三、线程池

3.1、⽇志与策略模式

什么是设计模式

IT⾏业这么⽕,涌⼊的⼈很多。俗话说林⼦⼤了啥⻦都有。⼤佬和菜鸡们两极分化的越来越严重。为了让菜鸡们不太拖⼤佬的后腿,于是⼤佬们针对⼀些经典的常⻅的场景,给定了⼀些对应的解决⽅案,这个就是设计模式。

⽇志认识

计算机中的⽇志是记录系统和软件运⾏中发⽣事件的⽂件,主要作⽤是监控运⾏状态、记录异常信 息,帮助快速定位问题并⽀持程序员进⾏问题修复。它是系统维护、故障排查和安全管理的重要⼯ 具。

⽇志格式以下⼏个指标是必须得有的

  • 时间戳
  • ⽇志等级
  • ⽇志内容

以下⼏个指标是可选的

  • ⽂件名⾏号
  • 进程,线程相关id信息等

⽇志有现成的解决⽅案,如:spdlog、glog、Boost.Log、Log4cxx等等,我们依旧采⽤⾃定义⽇志的⽅式。 这⾥我们采⽤设计模式-策略模式来进⾏⽇志的设计。我们想要的⽇志格式如下:

[可读性很好的时间]  [⽇志等级]  [进程pid]  [打印对应⽇志的⽂件名] [⾏号]  -  消息内容, ⽀持可变参数

[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] - hello world

[2024-08-04 12:27:03] [WARNING] [202938] [main.cc] [23] - hello world

示例代码:(注意Log.hpp中使用的Mutex和上面的自己封装的Mutex.hpp是同一份代码,这里就不重复展示了)

Log.hpp:

#include <fstream>
#include <sstream>
#include <memory>
#include <filesystem> //C++17
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"

namespace LogModule
{
    using namespace LockModule;

    // 获取当前系统的时间
    std::string CurrentTime()
    {
        time_t time_stamp = ::time(nullptr); // 获取时间戳
        struct tm curr;                      // 用作输出型参数
        localtime_r(&time_stamp, &curr);

        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",
                 curr.tm_year + 1900,
                 curr.tm_mon + 1,
                 curr.tm_mday,
                 curr.tm_hour,
                 curr.tm_min,
                 curr.tm_sec);

        return buffer;
    }

    // 日志文件的默认路径和文件名
    const std::string defaultlogpath = "./log/";
    const std::string defaultlogname = "log.txt";

    // 日志等级
    enum class LogLevel
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string Level2String(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::DEBUG:
            return "DEBUG";
        case LogLevel::INFO:
            return "INFO";
        case LogLevel::WARNING:
            return "WARNING";
        case LogLevel::ERROR:
            return "ERROR";
        case LogLevel::FATAL:
            return "FATAL";
        default:
            return "None";
        }
    }

    //刷新策略
    class LogStrategy
    {
    public:
        virtual ~LogStrategy() = default;
        virtual void SyncLog(const std::string &message) = 0;
    };

    //控制台策略
    class ConsoleLogStrategy : public LogStrategy
    {
    public:
        ConsoleLogStrategy()
        {}

        ~ConsoleLogStrategy()
        {}

        void SyncLog(const std::string &message)
        {
            LockGuard lockguard(_lock);
            std::cout << message << std::endl;
        }
    private:
        Mutex _lock;
    };

    //文件级策略
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname)
            :_logpath(logpath),
             _logname(logname)
        {
            //确认_logpath是存在的
            LockGuard lockguard(_lock);

            if(std::filesystem::exists(_logpath))
            {
                return ;
            }

            try
            {
                std::filesystem::create_directories(_logpath);
            }
            catch(std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << "\n";
            }
        }

        ~FileLogStrategy()
        {}

        void SyncLog(const std::string &message)
        {
            LockGuard lockguard(_lock);
            std::string log = _logpath + _logname;  //默认是 ./log/log.txt
            std::ofstream out(log, std::ios::app); //日志写入,一定是追加
            if(!out.is_open())
            {
                return;
            }
            out << message << "\n";
            out.close();
        }

    private:
        std::string _logpath;
        std::string _logname;

        Mutex _lock;
    };

    class Logger
    {
    public:
        Logger()
        {
            //默认采用ConsoleLogStrategy策略
            _strategy = std::make_shared<ConsoleLogStrategy>();
        }

        void EnableConsoleLog()
        {
            _strategy = std::make_shared<ConsoleLogStrategy>();
        }

        void EnableFileLog()
        {
            _strategy = std::make_shared<FileLogStrategy>();
        }

        ~Logger(){}
        // 一条完整的信息: [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] + 日志的可变部分(<< "hello world" << 3.14 << a << b;)

        class LogMessage
        {
        public:
            LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger)
                :_currtime(CurrentTime()),
                 _level(level),
                 _pid(getpid()),
                 _filename(filename),
                 _line(line),
                 _logger(logger)
            {
                std::stringstream ssbuffer;
                ssbuffer << "[" << _currtime << "] "
                         << "[" << Level2String(_level) << "] "
                         << "[" << _pid << "] "
                         << "[" << _filename << "] "
                         << "[" << _line << "] - ";
                _loginfo = ssbuffer.str();
            }

            template<typename T>
            LogMessage &operator<<(const T &info)
            {
                std::stringstream ss;
                ss << info;
                _loginfo += ss.str();
                return *this;
            }

            ~LogMessage()
            {
                if (_logger._strategy)
                {
                    _logger._strategy->SyncLog(_loginfo);
                }
            }
        private:
            std::string _currtime; // 当前日志时间
            LogLevel _level;       // 日志等级
            pid_t _pid;            // 进程pid
            std::string _filename; // 源文件名称
            int _line;             // 日志所在行号
            Logger &_logger;       // 负责根据不同的策略进行刷新
            std::string _loginfo;  // 一条完整的日志记录
        };

        //这里就是需要返回拷贝的临时对象
        LogMessage operator()(LogLevel level, const std::string &filename, int line)
        {
            return LogMessage(level, filename, line, *this);
        }

    private:
        std::shared_ptr<LogStrategy> _strategy; //日志刷新的策略方案
    };

    Logger logger;

#define LOG(Level) logger(Level, __FILE__, __LINE__)
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}

Main.cc:

#include "Log.hpp"

using namespace LogModule;

int main()
{
    ENABLE_FILE_LOG();

    LOG(LogLevel::DEBUG) << "hello file";
    LOG(LogLevel::DEBUG) << "hello file";
    LOG(LogLevel::DEBUG) << "hello file";
    LOG(LogLevel::DEBUG) << "hello file";


    ENABLE_CONSOLE_LOG();
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    LOG(LogLevel::DEBUG) << "hello world";
    return 0;
}

3.2、线程池设计

线程池:

⼀种线程使⽤模式。线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多 个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并发处理器、处理器内核、内存、⽹络sockets等的数量。

线程池的应用场景:

需要⼤量的线程来完成任务,且完成任务的时间⽐较短。⽐如WEB服务器完成⽹⻚请求这样的任 务,使⽤线程池技术是⾮常合适的。因为单个任务⼩,⽽任务数量巨⼤,你可以想象⼀个热⻔⽹站 的点击次数。但对于⻓时间的任务,⽐如⼀个Telnet连接请求,线程池的优点就不明显了。因为 Telnet会话时间⽐线程的创建时间⼤多了。

对性能要求苛刻的应⽤,⽐如要求服务器迅速响应客⼾请求。

接受突发性的⼤量请求,但不⾄于使服务器因此产⽣⼤量线程的应⽤。突发性⼤量客⼾请求,在没 有线程池情况下,将产⽣⼤量线程,虽然理论上⼤部分操作系统线程数⽬最⼤值不是问题,短时间 内产⽣⼤量线程可能使内存到达极限,出现错误。

线程池的种类:

  • a. 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执⾏任务对象中的任务接⼝。
  • b. 浮动线程池,其他同上。

此处,我们选择固定线程个数的线程池。

示例代码:(注意:ThreadPool.hpp中使用的锁,条件变量,日志均来自于上面的示例中封装的代码,即Mutex.hpp,Cond.hpp,Log.hpp,使用的线程来自于线程概念与控制文章中的线程封装,即Thread.hpp,这里就不在重复展示了)

Task.hpp:

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include "Log.hpp"

using namespace LogMudule;

using task_t = std::function<void(std::string name)>;

void Push(std::string name)
{
    LOG(LogLevel::DEBUG) << "我是一个推送数据到服务器的一个任务, 我正在被执行" << "[" << name << "]";
}

ThreadPool.hpp:

#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"

namespace ThreadPoolModule
{
    using namespace LogMudule;
    using namespace ThreadModule;
    using namespace LockModule;
    using namespace CondModule;

    //用来测试的线程方法
    void DefaultTest()
    {
        while(true)
        {
            LOG(LogLevel::DEBUG)<<"我是一个测试方法";
            sleep(1);
        }
    }

    using thread_t = std::shared_ptr<Thread>;

    const static int defaultnum = 5;

    template<typename T>
    class ThreadPool
    {
    private:
        bool IsEmpty(){ return _taskq.empty(); }

        void HandlerTask(std::string name)
        {
            LOG(LogLevel::INFO)<<"线程"<<name<<",进入HandlerTask的逻辑";
            while(true)
            {
                //拿任务
                T t;
                {
                    LockGuard lockguard(_lock);
                    while(IsEmpty() && _isrunning)
                    {
                        _wait_num++;
                        _cond.Wait(_lock);
                        _wait_num--;
                    }
                    //任务队列为空,并且线程池处于退出状态
                    if(IsEmpty() && !_isrunning)
                        break;

                    t = _taskq.front();
                    _taskq.pop();
                }

                //处理任务
                t(name); //假设规定未来所有任务都必须提供重载()方法
            }
            LOG(LogLevel::INFO)<<"线程:"<<name<<"退出";
        }
    public:
        ThreadPool(int num = defaultnum)
            :_num(num),
             _wait_num(0),
             _isrunning(false)
        {
            for(int i = 0; i < _num; i++)
            {
                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
                LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
            }
        }

        void Equeue(T &&in)
        {
            LockGuard lockguard(_lock);
            if(!_isrunning) return;
            _taskq.push(std::move(in));
            if(_wait_num > 0)
                _cond.Notify();
        }

        void Start()
        {
            if(_isrunning) return;
            _isrunning = true;
            for(auto &thread_ptr : _threads)
            {
                LOG(LogLevel::INFO)<<"启动线程"<<thread_ptr->Name()<<"...成功";
                thread_ptr->Start();
            }
        }

        void Wait()
        {
            for(auto &thread_ptr : _threads)
            {
                thread_ptr->Join();
                LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << " ... 成功";
            }
        }

        void Stop()
        {
            LockGuard lockguard(_lock);
            if(_isrunning)
            {
                //不能在入任务了
                _isrunning = false; //不工作
                //让线程自己退出
                //将休眠线程都唤醒,确保历史人任务被处理完了
                if(_wait_num > 0)
                {
                    _cond.NotifyAll();
                }
            }
        }

        ~ThreadPool()
        {}

    private:
        std::vector<thread_t> _threads; //管理所有线程的容器
        int _num; //线程总数量
        int _wait_num;  //休眠状态的线程数量
        std::queue<T> _taskq; //存放任务的队列,临界资源

        Mutex _lock;
        Cond _cond;

        bool _isrunning;
    };
}

ThreadPool.cc:

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>

using namespace ThreadPoolModule;

int main()
{
    ENABLE_CONSOLE_LOG();
    // ENABLE_FILE_LOG();

    std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();
    tp->Start();

    int cnt = 10;
    // char c;
    while (true)
    {
        // std::cin >> c;
        tp->Equeue(Push);
        cnt--;
        sleep(1);
    }

    tp->Stop();

    sleep(3);

    tp->Wait();

    return 0;
}

3.3、线程安全的单例模式

3.3.1、什么是单例模式

某些类,只应该具有⼀个对象(实例),就称之为单例。

在很多服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中。此时往往要⽤⼀个单例的类来管理这些数据。

3.3.2、饿汉实现⽅式和懒汉实现⽅式的理解

[洗碗的例⼦]

吃完饭, ⽴刻洗碗, 这种就是饿汉⽅式. 因为下⼀顿吃的时候可以⽴刻拿着碗就能吃饭。

吃完饭, 先把碗放下, 然后下⼀顿饭⽤到这个碗了再洗碗, 就是懒汉⽅式。

懒汉⽅式最核⼼的思想是"延时加载"。从⽽能够优化服务器的启动速度。

3.3.3、饿汉⽅式实现单例模式

template<typename T>

class Singleton

{

        static T data;

public:

        static T* GetInstance()

        {

                 return &data;

        }

};

3.3.4、懒汉⽅式实现单例模式

template<typename T>

class Singleton

{

        static T* inst;

public:

        static T* GetInstance()

        {

                if (inst == NULL)

                {

                        inst = new T();

                }

                return inst;

        }

};

注意:这里存在⼀个严重的问题,线程不安全。第⼀次调⽤ GetInstance 的时候,如果两个线程同时调⽤,可能会创建出两份 T 对象的实例。但是后续再次调⽤,就没有问题了。

3.3.6、懒汉⽅式实现单例模式(线程安全版本)

template<typename T>

class Singleton

{

        volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.

        static std::mutex lock;

public:

        static T* GetInstance()

        {

                if (inst == NULL)

                {        // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.

                        lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.

                        if (inst == NULL)

                        {

                                inst = new T();

                        }

                        lock.unlock();

                }

                return inst;

        }

};

注意事项:

  • 加锁解锁的位置
  • 双重 if 判定,避免不必要的锁竞争
  • volatile关键字防⽌过度优化

3.4、单例式线程池

注意:Mutex.hpp,Cond.hpp,Log.hpp,Thread.hpp,Task.hpp同上

ThreadPool.hpp:

#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"

namespace ThreadPoolModule
{
    using namespace LogMudule;
    using namespace ThreadModule;
    using namespace LockModule;
    using namespace CondModule;

    // 用来做测试的线程方法
    void DefaultTest()
    {
        while (true)
        {
            LOG(LogLevel::DEBUG) << "我是一个测试方法";
            sleep(1);
        }
    }

    using thread_t = std::shared_ptr<Thread>;

    const static int defaultnum = 5;

    template <typename T>
    class ThreadPool
    {
    private:
        bool IsEmpty() { return _taskq.empty(); }

        void HandlerTask(std::string name)
        {
            LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask的逻辑";
            while (true)
            {
                // 1. 拿任务
                T t;
                {
                    LockGuard lockguard(_lock);
                    while (IsEmpty() && _isrunning)
                    {
                        _wait_num++;
                        _cond.Wait(_lock);
                        _wait_num--;
                    }
                    // 2. 任务队列为空 && 线程池退出了
                    if (IsEmpty() && !_isrunning)
                        break;

                    t = _taskq.front();
                    _taskq.pop();
                }

                // 2. 处理任务
                t(name); // 规定,未来所有的任务处理,全部都是必须提供()方法!
            }
            LOG(LogLevel::INFO) << "线程: " << name << " 退出";
        }
        ThreadPool(const ThreadPool<T> &) = delete;
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

        ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
        {
            for (int i = 0; i < _num; i++)
            {
                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
                LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
            }
        }

    public:
        static ThreadPool<T> *getInstance()
        {
	    if(instance == NULL)
            {
	        LockGuard lockguard(mutex);
                if(instance == NULL)
                {
                    LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
                    instance = new ThreadPool<T>();
                }
	    }

            return instance;
        }

        void Equeue(T &&in)
        {
            LockGuard lockguard(_lock);
            if (!_isrunning)
                return;
            _taskq.push(std::move(in));
            if (_wait_num > 0)
                _cond.Notify();
        }
        void Start()
        {
            if (_isrunning)
                return;
            _isrunning = true; // bug fix
            for (auto &thread_ptr : _threads)
            {
                LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << " ... 成功";
                thread_ptr->Start();
            }
        }
        void Wait()
        {
            for (auto &thread_ptr : _threads)
            {
                thread_ptr->Join();
                LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << " ... 成功";
            }
        }
        void Stop()
        {
            LockGuard lockguard(_lock);
            if (_isrunning)
            {
                // 3. 不能在入任务了
                _isrunning = false; // 不工作
                // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
                if (_wait_num > 0)
                    _cond.NotifyAll();
            }
        }
        ~ThreadPool()
        {
        }

    private:
        std::vector<thread_t> _threads;
        int _num;
        int _wait_num;
        std::queue<T> _taskq; // 临界资源

        Mutex _lock;
        Cond _cond;

        bool _isrunning;

        static ThreadPool<T> *instance;
	static Mutex mutex; //只用来保护单例
    };

    template<typename T>
    ThreadPool<T> *ThreadPool<T>::instance = NULL;
    template<typename T>
    Mutex ThreadPool<T>::mutex; //只用来保护单例
}

ThreadPool.cc:

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>

using namespace ThreadPoolModule;

int main()
{
    ENABLE_CONSOLE_LOG();
    ThreadPool<task_t>::getInstance()->Start();
    char c;
    int cnt = 5;
    while (cnt)
    {
        // std::cin >> c;
        ThreadPool<task_t>::getInstance()->Equeue(Push);
        cnt--;
        sleep(1);
    }

    ThreadPool<task_t>::getInstance()->Stop();
    ThreadPool<task_t>::getInstance()->Wait();

    return 0;
}

四、线程安全和重入问题

概念:

  • 线程安全:就是多个线程在访问共享资源时,能够正确地执⾏,不会相互⼲扰或破坏彼此的执⾏结 果。⼀般⽽⾔,多个线程并发同⼀段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进⾏操作,并且没有锁保护的情况下,容易出现该问题。
  • 重⼊:同⼀个函数被不同的执⾏流调⽤,当前⼀个流程还没有执⾏完,就有其他的执⾏流再次进⼊, 我们称之为重⼊。⼀个函数在重⼊的情况下,运⾏结果不会出现任何不同或者任何问题,则该函数被 称为可重⼊函数,否则,是不可重⼊函数。

重⼊其实可以分为两种情况:

  • 多线程重⼊函数
  • 信号导致⼀个执⾏流重复进⼊函数

常⻅的线程不安全的情况:

  • 不保护共享变量的函数
  • 函数状态随着被调⽤,状态发⽣变化的函数
  • 返回指向静态变量指针的函数
  • 调⽤线程不安全函数的函数

常⻅的线程安全的情况:

  • 每个线程对全局变量或者静态变量只有读取 的权限,⽽没有写⼊的权限,⼀般来说这些 线程是安全的。
  • 类或者接⼝对于线程来说都是原⼦操作。
  • 多个线程之间的切换不会导致该接⼝的执⾏结果存在⼆义性。

常⻅不可重⼊的情况:

  • 调⽤了malloc/free函数,因为malloc函数 是⽤全局链表来管理堆的。
  • 调⽤了标准I/O库函数,标准I/O库的很多实 现都以不可重⼊的⽅式使⽤全局数据结构。
  • 可重⼊函数体内使⽤了静态的数据结构。

常⻅可重⼊的情况:

  • 不使⽤全局变量或静态变量
  • 不使⽤ malloc或者new开辟出的空间
  • 不调⽤不可重⼊函数
  • 不返回静态或全局数据,所有数据都有函数 的调⽤者提供
  • 使⽤本地数据,或者通过制作全局数据的本 地拷⻉来保护全局数据

可重⼊与线程安全联系:

  • 函数是可重⼊的,那就是线程安全的(其实知道这⼀句话就够了)。
  • 函数是不可重⼊的,那就不能由多个线程使⽤,有可能引发线程安全问题
  • 如果⼀个函数中有全局变量,那么这个函数既不是线程安全也不是可重⼊的。

可重⼊与线程安全区别:

  • 可重⼊函数是线程安全函数的⼀种
  • 线程安全不⼀定是可重⼊的,⽽可重⼊函数则⼀定是线程安全的。
  • 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重⼊函数若锁还未释放则会产⽣死锁,因此是不可重⼊的。

注意:如果不考虑信号导致⼀个执⾏流重复进⼊函数这种重⼊情况,线程安全和重⼊在安全⻆度不做区分。但是线程安全侧重说明线程访问公共资源的安全情况,表现的是并发线程的特点。可重⼊描述的是⼀个函数是否能被重复进⼊,表⽰的是函数的特点。

五、常见锁概念

5.1、死锁

  • 死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站⽤不会释放的资源⽽处于的⼀种永久等待状态。

为了⽅便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进⾏后续资源的访问。

申请⼀把锁是原⼦的,但是申请两把锁就不⼀定了。如图:

造成的结果是:

5.2、死锁四个必要条件

互斥条件:⼀个资源每次只能被⼀个执⾏流使⽤

请求与保持条件:⼀个执⾏流因请求资源⽽阻塞时,对已获得的资源保持不放

不剥夺条件:⼀个执⾏流已获得的资源,在末使⽤完之前,不能强⾏剥夺

循环等待条件:若⼲执⾏流之间形成⼀种头尾相接的循环等待资源的关系

5.3、避免死锁

破坏死锁的四个必要条件

  • 破坏循环等待条件问题:资源⼀次性分配,使⽤超时机制、加锁顺序⼀致
  • 避免锁未释放的场景

六、STL,智能指针和线程安全

6.1、STL中的容器是否是线程安全的?

不是

原因:STL的设计初衷是将性能挖掘到极致,⽽⼀旦涉及到加锁保证线程安全,会对性能造成巨⼤的影响。⽽且对于不同的容器,加锁⽅式的不同,性能可能也不同(例如hash表的锁表和锁桶)。因此STL默认不是线程安全。如果需要在多线程环境下使⽤,往往需要调⽤者⾃⾏保证线程安全。

6.2、智能指针是否是线程安全的?

对于unique_ptr,由于只是在当前代码块范围内⽣效,因此不涉及线程安全问题。 对于shared_ptr,多个对象需要共⽤⼀个引⽤计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原⼦操作(CAS)的⽅式保证shared_ptr能够⾼效,原⼦的操作引⽤计数。


网站公告

今日签到

点亮在社区的每一天
去签到