【Linux】线程封装与互斥(万字)

发布于:2024-07-04 ⋅ 阅读:(20) ⋅ 点赞:(0)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

目录

文章目录

前言

C++多线程的用法

对原生线程进行一次封装

理解pthread线程

Linux线程互斥

进程线程间的互斥相关背景概念

互斥量mutex

操作共享变量会有问题的售票系统代码

互斥量的接口

初始化互斥量

销毁互斥量

互斥量加锁和解锁

改进上面的售票系统:

方法一:定义一个静态或全局的锁变量gmutex

方法二:定义一个局部的锁

方法三: 临时对象, RAII风格的加锁和解锁(构造加锁,析构解锁)

互斥量实现原理探究

可重入VS线程安全

概念

常见的线程不安全的情况

常见的线程安全的情况

常见不可重入的情况

常见可重入的情况

可重入与线程安全联系

可重入与线程安全区别

总结



前言

世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!


提示:以下是本篇文章正文内容,下面案例可供参考

C++多线程的用法

#include <thread> // C++多线程所对应的头文件
#include <unistd.h>

void threadrun(int num)
{
    while(num)
    {
        std::cout << "I am a thread, num: " << num << std::endl;
        sleep(1);
    }
}
int main()
{
    std::thread t1(threadrun, 10);
    std::thread t2(threadrun, 10);
    std::thread t3(threadrun, 10);
    std::thread t4(threadrun, 10);
    std::thread t5(threadrun, 10);

    while(true)
    {
        std::cout << "I am a main thread "<< std::endl;
        sleep(1);
    }

    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();

    return 0;
}

对原生线程进行一次封装

C++11的多线程,是对原生线程的封装,所以在编译时,要链接上 -lpthread原生线程库。

为什么要封装呢?

  • 语言的跨平台性。在Linux当中,我们所使用的C++11的多线程,用的是Linux的pthread库;如果是在Windows当中,用的是Windows的原始对应的系统调用创建线程的接口,C++在给Linux和Windows当中提供的标准库是不一样的,C++给我们提供的标准库编译出来,在Windows中是Windows版本的,在Linux中是Linux版本的,所以对应的库不一样,但是代码是一样的。

Windows当中还要不要包含pthread库呢?

  • 不需要。语言具有跨平台性。

其它语言呢?

  • 大部分的语言要在Linux下跑多线程,必须要用原生线程库,因为pthread库是Linux提供多线程的底层唯一方式。
thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>// C++线程的头文件

namespace ThreadModule
{
    // using就相当于typedef,using定义了一个新类型std::function<void(T&)>,是一个新语法
    template<typename T>
    using func_t = std::function<void(T)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        Thread(func_t<T> func, T data, const std::string& name = "none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}

        // 方法:static修饰的函数中的参数是没有this指针的
        // 因为没有this指针,所以该函数里面也无法调用该类的成员对象了
        static void* threadroutine(void* args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T>* self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            // pthread_create()函数中的参数3的函数指针,要求的参数类型是void*,
            // 而threadroutine()函数是类成员函数,有一个this指针,所以调不了该函数
            int n = pthread_create(&_tid, nullptr, threadroutine, this);// 把当前对象this传threadroutine()
            if (!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if (!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if (!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T _data;  // 模板的参数类型T就直接是指针了
        func_t<T> _func;
        bool _stop;
    };
} 

#endif
testThread.cc
using namespace ThreadModule;

void print(int &cnt)
{
    while (cnt)
    {
        std::cout << "hello I am myself thread, cnt: " << cnt-- << std::endl;
        sleep(1);
    }
}

const int num = 10;

int main()
{
    std::vector<Thread<int> > threads;
    // 1. 创建一批线程
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads.emplace_back(print, 10, name);
    }

    // 2. 启动 一批线程
    for (auto &thread : threads)
    {
        thread.Start();
    }

    // 3. 等待一批线程
    for (auto &thread : threads)
    {
        thread.Join();
        std::cout << "wait thread done, thread is: " << thread.name() << std::endl;
    }

    // Thread<int> t1(print, 10);
    // t1.Start();
    // std::cout << "name: " << t1.name() << std::endl;
    // t1.Join();
    return 0;
}

理解pthread线程

  • 二进制代码刚运行时,pthread_t tid;只有一个进程,等执行到pthread_create()代码时,才创建出了新线程,从操作层面上:创建、终止、等待线程的接口都是在库当中实现的,库是将轻量级进程做了封装,所以给上层用户提供的就是库当中的方法,所以把我们用的线程叫做用户级线程。
  • 线程库首先要映射到当前进程的地址空间中(堆栈之间的共享区)!
  • 线程的管理工作要由库来进行管理!

那么库要如何管理线程呢?

  • 先描述,再组织!
  • 库里面要有描述线程的结构体,以及把所有的线程都组织在一起,线程的控制块:struct_pthread,一般我们喜欢将线程的控制块叫做struct_tcb,只不过Linux不提供struct_tcb,创建一个线程就为我们在库当中维护一个控制块结构,而每一个控制块结构的起始地址,就叫做线程的tid。tid的本质就是一个堆栈之间共享区的线程库中的控制块结构的起始地址(虚拟地址)。

  • 线程的整体结构:struct_pthread、线程局部存储、线程栈。
  • 动态库是共享库,多个进程,每一个进程都创建多个线程,每个进程的进程地址空间堆栈之间的共享区都是同一个动态库。
  • 全局变量在进程地址空间当中的已初始化数据区。
  • 线程局部存储:不能用来存储stl的容器数据,只能用来存储内置类型,因为它是一个C语言的库,不认识其它容器的数据。

线程库是不是磁盘当中的一个普通文件呢?

  • 线程库是一个动态库,也是磁盘当中的一个文件。

执行流(task_struct)是如何找到线程的线程栈的?

man clone

轻量级进程是Linux当中线程实现的底层方案,但真正线程实现是在库当中实现的。

Linux线程互斥

进程线程间的互斥相关背景概念

  • 临界资源:多线程执行流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  • 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
  • 当共享资源做了保护,就叫做临界资源。

互斥量mutex

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来一些问题。

操作共享变量会有问题的售票系统代码

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <unistd.h> 
#include <pthread.h> 
int ticket = 100;
void* route(void* arg)
{
    char* id = (char*)arg;
    while (1) 
    {
        if (ticket > 0) 
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
        }
        else 
        {
            break;
        }
    }
}
int main(void)
{
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, NULL, route, "thread 1");
    pthread_create(&t2, NULL, route, "thread 2");
    pthread_create(&t3, NULL, route, "thread 3");
    pthread_create(&t4, NULL, route, "thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

一次执行结果:
thread 4 sells ticket : 100
...
thread 4 sells ticket : 1
thread 2 sells ticket : 0
thread 1 sells ticket : -1
thread 3 sells ticket : -2

抢票的结构最终出现了负数,造成了数据不一致,为什么?

  • 因为g_tickets是一个全局的变量,这个全局的变量是没有被保护起来的,并且对全局变量_tickets的判断不是原子的。
  • 当_tickets == 1时,多个线程并发的判断,让很多线程都进入抢票逻辑。
  • 全局变量g_tickets是在内存当中的,当线程1执行if语句时,要进行票数的判断,判断是逻辑运算,必须在CPU内部运行,此时内存中的票数数据拷贝到CPU的寄存器当中,执行到usleep语句时,线程1被切换了出去,因为寄存器只有一套,所以为了保存线程1的上下文数据,数据被线程1带走了;
  • 此时切换到线程2执行判断逻辑,与线程1的情况一样也被切换走了,线程3和4都是如此;
  • 那么当线程1再次切换回来的时候,要重新在内存中读取数据,打印并--操作;
  • _tickets--(不是原子的)等价于_tickets = _tickets - 1;--操作数据改变,会影响原生内存中的数据,因为会写回内存;
  • 那么其它3个线程也都进入了抢票逻辑,所以会读取内存中的数据,打印并--操作,所以会打印出了负数的情况。

-- 操作并不是原子操作,而是对应三条汇编指令:

  • load :将共享变量ticket从内存加载到寄存器中
  • update : 更新寄存器里面的值,执行-1操作
  • store :将新值,从寄存器写回共享变量ticket的内存地址

要解决以上问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

互斥量的接口

初始化互斥量

初始化互斥量有两种方法:

  • 方法1,如果你定义的锁是静态的或者是全局的:那么这个锁可以不用init初始化和destroy销毁;你可以直接定义一个锁,并用PTHREAD_ MUTEX_ INITIALIZER宏对其进行初始化。
 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER 
  • 方法2,动态分配:

如果这把锁是一个局部的:建议init初始化和destroy销毁

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict 
attr); 
参数: 
mutex:要初始化的互斥量 
attr:NULL 

尝试的去申请锁:

int pthread_mutex_trylock(pthread_mutex_t *mutex);
尝试的去申请锁,跟申请锁成功和函数调用失败是一样的,但是申请锁失败了,不会阻塞,会立马出错返回。

销毁互斥量

销毁互斥量需要注意:

  • 使用 PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex); 

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); 
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
返回值:成功返回0,失败返回错误号 

调用 int pthread_mutex_lock(pthread_mutex_t *mutex) 时,可能会遇到以下情况:

  • 申请成功:函数就会返回,允许你继续向后运行;
  • 申请锁失败:函数就会阻塞,不允许你继续向后运行;
  • 函数调用失败:出错返回,比如:申请锁的对象已经被释放了

改进上面的售票系统:

方法一:定义一个静态或全局的锁变量gmutex

// 抢票逻辑
#include <iostream>
#include <vector>
#include <mutex> // C++11里面锁的头文件
#include "Thread.hpp"

using namespace ThreadModule;

// 数据不一致
int g_tickets = 10000; // 共享资源,没有保护的
// 线程的数据类型
class ThreadData
{
public:
    ThreadData(int& tickets, const std::string& name)
        : _tickets(tickets), _name(name), _total(0))
    {
    }
    ~ThreadData()
    {
    }

public:
    int& _tickets; // 所有的线程,最后都会引用同一个全局的g_tickets
    std::string _name;
    int _total;
};

// 方法一:定义一个静态或全局的锁变量gmutex
// gmutex中的g表示globle的意思
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;// 定义一个锁

 // 线程执行的方法
 void route(ThreadData *td)
 {
     // 加锁
     while (true)
     {
         // 访问临界资源的代码,叫做临界区!
         // 我们加锁,本质就是把多线程的并行执行变为串行执行 --- 加锁的力度要越细越好
         pthread_mutex_lock(&gmutex); // 加锁 : 竞争锁是自由竞争的,竞争锁的能力太强的线程,会导致其他线程抢不到锁 --- 造成了其他线程的饥饿问题!
         
         if (td->_tickets > 0)     
         {
             // 模拟一次抢票的逻辑
             usleep(1000);
             printf("%s running, get tickets: %d\n", td->_name.c_str(), td->_tickets);
             td->_tickets--;  
             pthread_mutex_unlock(&gmutex); // 解锁 法一:
                                          
             td->_total++;
         }
         else
         {
             pthread_mutex_unlock(&gmutex); // 解锁 法一:
             break;
         }
     }
     // 解锁
 }

const int num = 4;
int main()
{
    std::cout << "main: &tickets: " << &g_tickets << std::endl;

    // std::mutex mutex;// C++11的做法,不用初始化,因为它有构造函数

    std::vector<Thread<ThreadData*>> threads;
    std::vector<ThreadData*> datas;// 每个线程抢了多少张票
    // 1. 创建一批线程
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        ThreadData* td = new ThreadData(g_tickets, name);
        threads.emplace_back(route, td, name);
        datas.emplace_back(td);
    }

    // 2. 启动 一批线程
    for (auto& thread : threads)
    {
        thread.Start();
    }

    // 3. 等待一批线程
    for (auto& thread : threads)
    {
        thread.Join();
        std::cout << "wait thread done, thread is: " << thread.name() << std::endl;
    }
    sleep(1);
    // 4. 输出统计数据
    for (auto data : datas)
    {
        std::cout << data->_name << " : " << data->_total << std::endl;
        delete data;
    }

    // pthread_mutex_destroy(&mutex);

    return 0;
}

方法二:定义一个局部的锁

// 抢票逻辑
#include <iostream>
#include <vector>
#include <mutex> // C++11里面锁的头文件
#include "Thread.hpp"

using namespace ThreadModule;

// 数据不一致
int g_tickets = 10000; // 共享资源,没有保护的
// 线程的数据类型
class ThreadData
{
public:
    ThreadData(int& tickets, const std::string& name, pthread_mutex_t &mutex)
        : _tickets(tickets), _name(name), _total(0), _mutex(mutex)
    {
    }
    ~ThreadData()
    {
    }

public:
    int& _tickets; // 所有的线程,最后都会引用同一个全局的g_tickets
    std::string _name;
    int _total;
    pthread_mutex_t& _mutex;
};

 // 线程执行的方法
 void route(ThreadData *td)
 {
     // 加锁
     while (true)
     {       
         // 方法二:加锁
         pthread_mutex_lock(&td->_mutex);
         if (td->_tickets > 0)      
         {
             // 模拟一次抢票的逻辑
             usleep(1000);
             printf("%s running, get tickets: %d\n", td->_name.c_str(), td->_tickets); 
             td->_tickets--;  
                                          
             pthread_mutex_unlock(&td->_mutex); // 解锁 法二:
             td->_total++;
         }
         else
         {
             pthread_mutex_unlock(&td->_mutex); // 解锁 法二:
             break;
         }
     }
     // 解锁
 }

const int num = 4;
int main()
{
    // 方法二:定义一个局部的锁
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex, nullptr);// 锁的属性设为nullptr

    // std::mutex mutex;// C++11的做法,不用初始化,因为它有构造函数

    std::vector<Thread<ThreadData*>> threads;
    std::vector<ThreadData*> datas;// 每个线程抢了多少张票
    // 1. 创建一批线程
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        ThreadData* td = new ThreadData(g_tickets, name, mutex);// 把局部的锁,以参数的形式传递到线程内部,而不是以全局的形式
        threads.emplace_back(route, td, name);
        datas.emplace_back(td);
    }

    // 2. 启动 一批线程
    for (auto& thread : threads)
    {
        thread.Start();
    }

    // 3. 等待一批线程
    for (auto& thread : threads)
    {
        thread.Join();
        std::cout << "wait thread done, thread is: " << thread.name() << std::endl;
    }
    sleep(1);
    // 4. 输出统计数据
    for (auto data : datas)
    {
        std::cout << data->_name << " : " << data->_total << std::endl;
        delete data;
    }

    pthread_mutex_destroy(&mutex);
    return 0;
}

方法三: 临时对象, RAII风格的加锁和解锁(构造加锁,析构解锁)

LockGuard.hpp
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__

#include <iostream>
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex) :_mutex(mutex)
    {
        pthread_mutex_lock(_mutex); // 构造加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t* _mutex;
};

#endif
void route(ThreadData* td)
{
    while (true)
    {
        {   // 担心就用这个
            LockGuard guard(&td->_mutex); // 临时对象, RAII风格的加锁和解锁(构造加锁,析构解锁)

            // td->_mutex.lock();C++11的做法
            // std::lock_guard<std::mutex> lock(td->_mutex);C++11中也封装了lock_guard
            if (td->_tickets > 0) // 1
            {
                usleep(1000);
                printf("%s running, get tickets: %d\n", td->_name.c_str(), td->_tickets); // 2
                td->_tickets--;                                                           // 3
                td->_total++;
                // td->_mutex.unlock();C++11
            }
            else
            {
                // td->_mutex.unlock();
                break;
            }
        }
    }
}

什么是原子的?

  • 一条语句将来被汇编之后,只有一条汇编。

互斥量实现原理探究

  • 经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
  • 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一 个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下

互斥的底层实现:

  1. 假设CPU中有一个寄存器%al,锁相当于内存当中的整型变量;
  2. 假设刚开始把锁初始化为1,线程1此时要申请锁,它把0放入%al的寄存器中,再把寄存器中的值和锁变量中的值进行交换,就完成了加锁,若线程1加锁成功,那么线程1就执行它的代码;
  3. 假设线程1在执行完第二条语句时,线程1被切换成线程2,线程1被切换走时,会把寄存器中的数据带走;
  4. 线程2开始申请锁,线程2调用pthread_mutex_lock()函数从0开始申请锁,将0放入寄存器中,再与内存中的值做交换,因为都是0,所以申请锁失败,挂起等待,将数据带走;
  5. 等线程1回来时,将之前带走的数据恢复到寄存器中,加锁成功;
  6. 成功了之后,还要解锁,将mutex变量重新置为1。

  • 寄存器内部的数据不属于CPU,它属于当前线程的硬件上下文
  • 临界区内部,正在访问临界区的线程,可以被OS切换调度,被切换出去的时候,把锁也带走了。申请锁成功的线程1正在访问临界区,即使线程1被挂起了,其它任何线程都进不来临界区,那么临界区对于其它的线程来说就是原子的,该线程是安全的。
  • 互斥是为了解决数据安全的问题;同步是为了解决资源被充分利用的问题。
  • 线程被切换的时机是随机的。
  • 交换的本质:不是拷贝到寄存器,而是所有线程在争锁的时候,只有一个1。
  • 交换的时候,只有一条汇编 --- 原子的。
  • CPU寄存器硬件只有一套,但是CPU寄存器内部的数据是线程的硬件上下文。
  • 数据在内存里,所有线程都能访问,属于共享的。但是如果转移到CPU内部寄存器中,就属于一个线程私有了。
  • 互斥:任何时刻只允许一个线程进行访问。

线程互斥:

  • 保护并不是把临界资源怎么样,而是保护多个线程都会执行访问临界资源的代码,我们要保护的是临界区。

可重入VS线程安全

概念

  • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作, 并且没有锁保护的情况下,会出现该问题。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

常见的线程不安全的情况

  • 不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数

常见的线程安全的情况

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
  • 类或者接口对于线程来说都是原子操作
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

  • 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
  • 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
  • 可重入函数体内使用了静态的数据结构

常见可重入的情况

  • 不使用全局变量或静态变量
  • 不使用用malloc或者new开辟出的空间
  • 不调用不可重入函数
  • 不返回静态或全局数据,所有数据都有函数的调用者提供
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

可重入与线程安全联系

  • 函数是可重入的,那就是线程安全的
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
  • 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的

可重入与线程安全区别

  • 可重入函数是线程安全函数的一种
  • 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
  • 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生 死锁,因此是不可重入的。


总结

好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。