Linux线程

发布于:2024-10-11 ⋅ 阅读:(8) ⋅ 点赞:(0)

文章目录

1. 线程概念

1.1 什么是线程

线程:是进程内的一个执行分支。线程的执行粒度要比进程细

image-20240909221657177

1.2 Linux的进程

看上图,我们把那一个一个的task_struct称为执行流。第一个进程创建的线程称为主线程,随后被创建的线程称为次线程

  1. 在Linux中,线程在进程“内部”执行,线程在进程的地址空间内运行(为什么?)

    ​ 因为任何执行流要执行,都要有资源,而==地址空间是进程的资源窗口==

  2. 在Linux中,什么叫做线程的执行粒度要比进程要更细?

    ​ 线程执行进程代码的一部分

线程一多,OS也需要进行管理,先描述,在组织。实际上,在Window上,不仅有struct pcb也有struct tcb(thread control block)。
而在Linux上,直接复用了进程数据结构和管理算法,即使用struct task_struct来模拟线程。所以我们会说:Linux上没有真正意义上的线程
而是使用“进程”(的内核数据结构)来模拟线程。

所以Linux中的线程一般叫做轻量级的进程

1.3 重新定义进程和线程

之前理解的进程:进程 = 内核数据结构(task_struct)+ 代码和数据

  1. 什么叫做线程? 我们认为,线程操作系统调度的基本单位
  2. 重新理解进程? Linux内核观点:进程是承担分配系统资源的基本实体,即操作系统分配资源,是以进程为单位进行分配的。线程是进程内部的执行流资源

image-20240909224553336

1.4 重谈地址空间

如何理解资源分配给线程:线程分配资源的本质,就是分配地址空间范围。

相关概念:

  1. “页框”(Page Frame)和"页帧"(Page Frame)通常是指相同的概念,只是不同的文献或资料中可能使用不同的术语。它们都指的是物理内存中的一个固定大小的单元这个单元是内存分页管理中的最小分配单位。(通常是4KB、2MB或1GB等大小,这是由操作系统的内存管理单元(MMU)和处理器的内存分页机制决定的)这里使用4KB,4*2^10 Byte = 2^12 Byte
  2. CR3 寄存器在 x86 体系结构中用于存储当前正在使用的页目录表的物理地址,这是进行虚拟内存管理时的关键信息。CR2寄存器储存的是引起缺页中断异常的地址

image-20240910173249297

一个二级页表可以映射4MB,一共有1024个二级页表,可以映射1024*4MB=4GB。
一个二级页表的大小是4KB,一共有1024个二级页表,页表大小是1024*4KB=4MB。

二级页表在大部分情况下都是不全的,但必须要有页目录

1.5 Linux线程周边概念

1.5.1 线程 VS 进程

  • 线程比进程更加轻量化
    a. 创建和释放更加轻量化(生死,创建线程只需要创建pcb。创建进程需要pcb,地址空间,页表,加载到内存,构建映射关系,信号的三张表,文件描述符表……。释放同理)
    b. 切换更加轻量化(运行,线程切换时不需要切换页表,地址空间,只是局部在切换,所以效率更高。cpu中的cache在运行一个进程中的多个线程时,虽然不同进程的上下文在一直切换,但是chche缓存中的热数据(经常被访问的数据)更新更少,因为进程中的很多数据是多个线程共享的
  • 进程是资源分配的基本单位
  • 线程是调度的基本单位
  • 线程共享进程数据,但也拥有自己的一部分数据,如 线程ID、一组寄存器(线程有独立的的上下文)栈(独立的栈结构保证了不同线程运行时不会出现执行流错乱的问题)、errno、信号屏蔽字、调度优先级……
  • 进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的, 如果定义一个函数, 在各线程中都可以调用, 如果定义一个全局变量,在各线程中都可以访问到, 除此之外,各线程还共享以下进程资源和环境: 文件描述符表 、每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数) 、当前工作目录 、用户id和组id

进程和线程的关系如下图:
image-20240910214910089

1.5.2 线程的优点缺点

优点:

  • 创建一个新线程的代价要比创建一个新进程小得多
  • 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
  • 线程占用的资源要比进程少很多
  • 能充分利用多处理器的可并行数量
  • 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
  • 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
  • I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作

缺点:

  • 性能损失

    一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变

  • 健壮性降低

    编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的

  • 缺乏访问控制

    进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响

  • 编程难度提高

    编写与调试一个多线程程序比单线程程序困难得多

1.5.3 线程的异常和用途

线程异常:

  • 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
  • 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出

线程用途:

  • 合理的使用多线程,能提高CPU密集型程序的执行效率
  • 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)

2. Linux 线程控制

首先,**内核中有没有很明确的线程的概念**,而有**轻量级进程的概念**。当我们想写多线程代码时,可以使用**POSIX线程库**,这是一个
处于应用层位置的库,几乎所有的Linux发行版都默认带这个库,使用时需要引入头文件`pthread.h`

2.1 创建线程

2.1.1 基础创建

功能:创建一个新的线程
原型:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg);
参数:
thread:输出型参数,返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
  • 传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误
  • pthreads函数出错时不会设置全局变量errno(而大部分其他POSIX函数会这样做)。而是将错误代码通过返回值返回
  • pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,建议通过返回值来判定,因为读取返回值要比读取线程内的errno变量的开销更小

由于是第三方库,所以编译时要加上-lpthread

myThread : myThread.cc
	g++ -o $@ $^ -std=c++11 -lpthread

.PHONY : clean
clean:
	rm -f myThread

image-20240914203245933

可以看到,这不同的线程拥有同一个进程的pid

当我们想观察线程状态时,可以使用ps -aL命令,LWP的时ligtht weight process的缩写,PID等于LWP的线程是主线程。当我们使用kill -9命令时,杀掉任意一个线程,整个进程都会被kill掉

image-20240914203630015

下面这个可以持续观察线程状态

while :; do ps -aL | head -1 && ps -aL | grep 'myThread' | grep -v ps; echo "##############################################"; sleep 1; done

2.1.2 让一个函数被重入,让多个执行流同时调用

// 可以被多个执行流执行
void Show(const string& s) 
{
    cout << s << " say: hello!" << endl;
}

void* ThreadRoutine(void* args) 
{
    while(true) {
        // cout << "new thread, pid is " << getpid() << endl;
        Show("[new thread]");
        sleep(1); 
    }
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, nullptr);
    while(true) {
        // cout << "main thread, pid is " << getpid() << endl;
        Show("[main thread]");
        sleep(1); 
    }
    return 0;
}

image-20240914204817506

虽然打印出现了错行的情况,但不影响现象的产生


2.1.3 线程之间进行通信很容易

int g_val = 0;

void* ThreadRoutine(void* args) 
{
    while(true) {
        // cout << "new thread, pid is " << getpid() << endl;
        // Show("[new thread]");
        printf("I am new thread, g_val: %d, &g_val: %p\n", g_val, &g_val);
        sleep(1); 
    }
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, nullptr);
    while(true) {
        // cout << "main thread, pid is " << getpid() << endl;
        // Show("[main thread]");
        g_val++;
        printf("I am main thread, g_val: %d, &g_val: %p\n", g_val, &g_val);
        sleep(1); 
    }
    return 0;
}

image-20240914205310595


2.1.4 线程任意一个出现异常,进程都会退出

void* ThreadRoutine(void* args) 
{
    while(true) {
        cout << "new thread, pid is " << getpid() << endl;
        sleep(5);
        int *p =nullptr;
        *p = 1;
    }
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, nullptr);
    while(true) {
        cout << "main thread, pid is " << getpid() << endl;
        sleep(1); 
    }
    return 0;
}

image-20240914205647889


2.1.5 观察类型是pthread_t 的 tid

定义 pthread_t 类型是 无符号长整型

typedef unsigned long int pthread_t;

可以打印查看

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, nullptr);
    while(true) {
        printf("I am main thread, creat new thread tid is %lu\n");
        sleep(1); 
    }
    return 0;
}

image-20240914210917302

可以看到,打印出来的tid的值与LWP的值完全不一样,这是因为tid是给用户使用的的,而LWP是给OS使用的。
实际上,tid充当的是地址,在2.4中会介绍

2.1.6 给子线程传递参数

void* ThreadRoutine(void* args) 
{
    const char* name = (const char*)args;
    while(true) {
        printf("%s is running\n", name);
        sleep(1); 
    }
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, (void*)"new thread");
    while(true) {
        printf("main thread is running\n");
        sleep(1); 
    }
    return 0;
}

image-20240914211629840

2.2 线程等待

2.2.1 为什么需要线程等待?

  • 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
  • 创建新的线程不会复用刚才退出线程的地址空间。
  • 如果需要,可以获取子线程的返回值
  • 如果不调用pthread_join,主线程可能会在子线程完成之前就退出,这可能导致子线程的输出不可见,或者在某些情况下,程序可能在子线程完成之前就结束了,导致资源没有被正确释放。

2.2.2 基础使用

功能:等待线程结束
原型
int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
  1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
  2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED,其值为-1。
  3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
  4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
void* ThreadRoutine(void* args) 
{
    const char* name = (const char*)args;
    int cnt = 5;
    while(cnt--) {
        printf("%s is running\n", name);
        sleep(1); 
    }
    return nullptr;     // 线程走到这里后,会默认退出
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, (void*)"new thread");
    pthread_join(tid, nullptr);     // 主线程等待的时候,默认是阻塞等待
    cout << "main thread quit" << endl;
    return 0;
}

image-20240914213150340

2.2.3 获取子线程的返回值

image-20240914214809868

void* ThreadRoutine(void* args) 
{
    const char* name = (const char*)args;
    int cnt = 5;
    while(cnt--) {
        printf("%s is running\n", name);
        sleep(1); 
    }
    return (void*)1;     // 线程走到这里后,会默认退出
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, (void*)"new thread");
    void* ret;
    pthread_join(tid, &ret);     // 主线程等待的时候,默认是阻塞等待
    printf("main thread quit, ret: %d\n", (long long)ret);  // 由于void* 是8字节,所以这里要用8字节的long long 来进行强转
    return 0;
}

image-20240914220539475


如果想要多返回些信息,可以使用类

struct Request
{
    Request(int s, int e, string n) 
    : _start(s)
    , _end(e)
    , _name(n) {}

    // 累加 [start, end]
    int Run()
    {
        int tmp = 0;
        for(int i = _start; i<=_end; ++i) {
            tmp += i;
        }
        return tmp;
    }

    int _start;
    int _end;
    string _name;
};

struct Response
{
    Response(int r, int e) 
    : _res(r)
    , _exitCode(e) {}

    int _res;
    int _exitCode;
};


void* GetSum(void* args)
{
    Request* rq = static_cast<Request*>(args);
    Response* rs = new Response(0, 0);
    // 计算
    rs->_res = rq->Run();
    delete rq;
    return rs;
}
 
int main()
{
    pthread_t tid;
    Request* rq = new Request(1, 100, "new Thread");
    pthread_create(&tid, nullptr, GetSum, rq);

    void* ret;
    pthread_join(tid, &ret);
    Response* rs = static_cast<Response*>(ret);
    printf("res: %d, exitCode: %d\n", rs->_res, rs->_exitCode);
    delete rs;
    return 0;
}

image-20240914225356363

以这个为例子,如果一个计算任务很大,比如1-100000,就可以拆分,让不同的线程执行不同的范围,最后主线程再将子线程的结果进行汇总

2.3 线程终止

2.3.1 直接使用return

上面的所有例子都是使用这个方法,当线程走到return后,会默认退出。注意:exit()是进程退出函数,不是线程,当线程函数使用它时会使整个进程退出

2.3.2 pthread_exit

功能:线程终止
原型
void pthread_exit(void *value_ptr);
参数
value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
void* ThreadRoutine(void* args) 
{
    const char* name = (const char*)args;
    int cnt = 5;
    while(cnt--) {
        printf("%s is running\n", name);
        sleep(1); 
    }
    pthread_exit((void*) 123);
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, (void*)"new thread");
    void* ret;
    pthread_join(tid, &ret);     // 主线程等待的时候,默认是阻塞等待
    printf("main thread quit, ret: %d\n", (long long)ret);  // 由于void* 是8字节,所以这里要用8字节的long long 来进行强转
    return 0;
}

image-20240914221801170

2.3.3 pthread_cancel

功能:取消一个执行中的线程
原型
int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码
void* ThreadRoutine(void* args) 
{
    const char* name = (const char*)args;
    while(true) {
        printf("%s is running\n", name);
        sleep(1); 
    } 
	return nullptr;
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, (void*)"new thread");
    // 主进程取消掉子进程
    sleep(1);
    pthread_cancel(tid);
    void* ret;
    pthread_join(tid, &ret);     // 主线程等待的时候,默认是阻塞等待
    printf("main thread quit, ret: %d\n", (long long)ret);  // 由于void* 是8字节,所以这里要用8字节的long long 来进行强转
    return 0;
}

可以看到ret为-1,那是因为PTHREAD_ CANCELED,其值为-1。见2.2.2

2.4 线程ID及进程地址空间布局

  • pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事,(pid和LWP)
  • 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程
  • pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的
  • 线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID:
功能:
       pthread_self - 获取调用线程的ID

原型:
       pthread_t pthread_self(void);
描述:
       pthread_self()函数返回调用线程的ID。
	与在创建this的pthread_create(3)调用中的*thread中返回的值
	线程是一样的。
返回值:
       这个函数总是成功,返回调用线程的ID。
// 将tid以地址的格式打出来,即16进制
string ToHex(pthread_t tid)
{
    char hex[64];
    snprintf(hex, sizeof(hex), "%p", tid);
    return hex;
}

void* ThreadRoutine(void* args)
{
    while(true) {
        // 打印出进程id,将它转成16进程
        // cout << "thread id: " << ToHex(pthread_self()) << endl;
        printf("thread id: %s\n", ToHex(pthread_self()).c_str());
        sleep(1);
    }
}

int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, ThreadRoutine, nullptr);
    while(true) {
        printf("child tid: %s\n", ToHex(tid).c_str());
        sleep(1);
    }
    pthread_join(tid, nullptr);
    return 0;
}

image-20240915094954153

pthread_t到底是什么类型呢?取决于实现。对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址

image-20240915100849785

可以看到,除了主线程,所有其他线程的独立栈,都在共享区,具体来讲是在pthread库中,tid指向的用户tcb中,这样,线程在调度运行的时候就不会互相干扰了。

2.5 其它问题

2.5.1 创建多个线程

#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <pthread.h>
#define NUM 3
using namespace std;

struct ThreadData
{   
    ThreadData(string name) : _threadName(name) {}
    string _threadName;
};

void* ThreadRoutine(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    int cnt = 5;
    while(cnt--) {
        printf("%s, tid: %p, pid: %d\n", td->_threadName.c_str(), pthread_self(), getpid());
        sleep(1);
    }
    delete td;
    return nullptr;
}

int main()
{
    // 创建多个线程
    vector<pthread_t> tids;
    for (size_t i = 0; i < NUM; i++) {
        pthread_t tid;
        ThreadData* td = new ThreadData("Thread-" + to_string(i+1));
        // 给子线程传递数据,堆空间是共享的
        pthread_create(&tid, nullptr, ThreadRoutine, td);
        tids.push_back(tid);
    }
    
    // 等待线程
    for(const auto& t : tids) {
        pthread_join(t, nullptr);
    }
    
    return 0;
}

image-20240918115709458

2.5.2 每一个线程自己独立的栈结构

修改ThreadRoutine()函数

void* ThreadRoutine(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    int cnt = 5, x = 0;
    while(cnt--) {
        printf("%s, tid: %p, pid: %d, x: %d, &x: %p\n", td->_threadName.c_str(), pthread_self(), getpid(), x, &x);
        x++;
        sleep(1);
    }
    delete td;
    return nullptr;
}

image-20240918195740301

可以看到,每一个线程都独享一个x,即独立的栈空间。注意这里是独立,并不是私有,其它线程想访问还是可以的,比如主线程想访问Thread-2的x值

int *p = nullptr;

void* ThreadRoutine(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    int cnt = 5, x = 0;
    while(cnt--) {
        printf("%s, tid: %p, pid: %d, x: %d, &x: %p\n", td->_threadName.c_str(), pthread_self(), getpid(), x, &x);
        x++;
        // 获取该线程的x
        if(td->_threadName == "Thread-2")   p = &x;
        sleep(1);
    }
    delete td;
    return nullptr;
}

int main()
{
    // 创建多个线程
    vector<pthread_t> tids;
    for (size_t i = 0; i < NUM; i++) {
        pthread_t tid;
        ThreadData* td = new ThreadData("Thread-" + to_string(i+1));
        // 给子线程传递数据,堆空间是共享的
        pthread_create(&tid, nullptr, ThreadRoutine, td);
        tids.push_back(tid);
    }
    sleep(2);
    printf("Main Thread get x: %d, &x: %p\n", *p, p);
    // ...
}

image-20240918203622785

2.5.3 全局变量

默认情况下,全局变量被所有线程共享

// 共享资源
int gVal = 100;

void* ThreadRoutine(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    int cnt = 5;
    while(cnt--) {
        gVal++;
        printf("%s, tid: %p, pid: %d, gVal: %d, &gVal: %p\n", td->_threadName.c_str(), pthread_self(), getpid(), gVal, &gVal);
        sleep(1);
    }
    delete td;
    return nullptr;
}

image-20240918205249938

如果想要让每个进程独享该变量,可以在变量前加上__thread,让编译器把该变量放到tcb的线程局部存储单元中,见2.4。注意这里只能初始化内置类型,自定义类型是不可以的

__thread int gVal = 100;

image-20240918210206504

介绍一个__thread的用法:减少系统调用

__thread unsigned long int self = 0;
__thread int tPid = 0;
void* ThreadRoutine(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    self = pthread_self();
    tPid = getpid();
    int cnt = 5;
    while(cnt--) {
        printf("%s, tid: %p, pid: %d\n", td->_threadName.c_str(), self, tPid);
        sleep(1);
    }
    delete td;
    return nullptr;
}

像上面这样,就减少了系统调用的次数。可以将这样的变量理解为线程级别的全局变量,不同线程之间变量互不干扰

2.6 分离线程

  • 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
  • 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
int pthread_detach(pthread_t thread);

可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:

pthread_detach(pthread_self());

joinable和分离是冲突的,一个线程不能既是joinable又是分离的

__thread unsigned long int self = 0;
__thread int tPid = 0;
void* ThreadRoutine(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    self = pthread_self();
    tPid = getpid();
    // 自己分离
    pthread_detach(self);
    int cnt = 5;
    while(cnt--) {
        printf("%s, tid: %p, pid: %d\n", td->_threadName.c_str(), self, tPid);
        sleep(1);
    }
    delete td;
    return nullptr;
}

int main()
{
    // 创建多个线程
    vector<pthread_t> tids;
    for (size_t i = 0; i < NUM; i++) {
        pthread_t tid;
        ThreadData* td = new ThreadData("Thread-" + to_string(i+1));
        // 给子线程传递数据,堆空间是共享的
        pthread_create(&tid, nullptr, ThreadRoutine, td);
        tids.push_back(tid);
    }
    sleep(2);
    // 等待线程
    for(const auto& t : tids) {
        int ret = pthread_join(t, nullptr);
        printf("ret: %d, who: %p, why: %s\n", ret, t, strerror(ret));
    }
    
    return 0;
}

image-20240918213816179

3. Linux 线程互斥

3.1 相关概念

  • 临界资源:多线程执行流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  • 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

3.2 互斥量mutex

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来一些问题。 看下面的代码
struct ThreadData
{   
    ThreadData(int i) 
    {
        _threadName = "Thread-" + to_string(i);
    }
    string _threadName;
};

// 共享资源,票
int ticket = 1000;

void* GetTicket(void* args)
{
    ThreadData* data = static_cast<ThreadData*>(args);
    const char* name = data->_threadName.c_str();
    while(true) {
        // 抢票
        printf("%s get a ticket, ticket: %d\n", name, ticket);
        ticket--;
        if(ticket <= 0) {
            // 没票了,该线程就break
            printf("%s can not get ticket, done\n", name);
            break;
        }
        usleep(1000);
    }
    return nullptr;
}

int main()
{
    vector<pthread_t> tids;
    vector<ThreadData*> datas;
    for(size_t i = 1; i <= NUM; ++i) {
        ThreadData* data = new ThreadData(i);
        datas.push_back(data);
        pthread_t tid;
        pthread_create(&tid, nullptr, GetTicket, datas[i-1]);
        tids.push_back(tid);
    }

    // 善后处理
    for(const auto& t : tids)    pthread_join(t, nullptr);
    for(const auto& d : datas)   delete d;
    return 0;
}

运行后可以看到,票出现了负数,所以出现了问题

image-20240919171807936

该现象叫共享资源被多线程访问后出现数据不一致问题,为什么会出现呢?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
  • ticket-- 操作本身就不是一个原子操作

-- 操作并不是原子操作,而是对应三条汇编指令:

  • load :将共享变量ticket从内存加载到寄存器中。线程在执行的时候,将共享数据加载到CPU寄存器的本质是:把数据的内容,变成自己的上下文,即以拷贝的方式,给自己单独拿了一份。
  • update : 更新寄存器里面的值,执行-1操作
  • store :将新值,从寄存器写回共享变量ticket的内存地址

要解决以上问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

image-20240922150555936

3.3 互斥量的接口

初始化互斥量

方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:不关心,暂时设置为NULL

销毁互斥量

需要注意:

  • 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex)

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);		// 非阻塞版本
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号

调用 pthread_ lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

注意:

加锁的本质:用时间换安全
加锁的表现:线程对临界区代码串行执行
加锁的原则:尽量保证临界区的代码越少越好

3.4 改进3.2中的代码

// 方法1,静态分配
struct ThreadData
{   
    ThreadData(int i) 
    {
        _threadName = "Thread-" + to_string(i);
    }
    string _threadName;
};

// 共享资源,票
int ticket = 1000;

void* GetTicket(void* args)
{
    static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;    // 也可以使用全局变量
    ThreadData* data = static_cast<ThreadData*>(args);
    const char* name = data->_threadName.c_str();
    while(true) {
        pthread_mutex_lock(&lock);    // 申请锁成功,才能往后执行,不成功,线程就阻塞等待
        if(ticket > 0) {
            usleep(1000);
            // 抢票
            printf("%s get a ticket, ticket: %d\n", name, ticket);
            ticket--;
            pthread_mutex_unlock(&lock);
        }
        else {
            // 没票了,该线程就break
            printf("%s can not get ticket, done\n", name);
            pthread_mutex_unlock(&lock);
            break;
        }
        /* 防止这个线程抢完票后又立即去申请锁,让它sleep()一会儿,给其它正在被阻塞的线程一个机会,否则该线程迟迟分配不到资源, 会导致饥饿问题 */
        usleep(15);     
    }
    return nullptr;
}

int main()
{
    vector<pthread_t> tids;
    vector<ThreadData*> datas;
    for(size_t i = 1; i <= NUM; ++i) {
        ThreadData* data = new ThreadData(i);
        datas.push_back(data);
        pthread_t tid;
        pthread_create(&tid, nullptr, GetTicket, datas[i-1]);
        tids.push_back(tid);
    }

    // 善后处理
    for(const auto& t : tids)    pthread_join(t, nullptr);
    for(const auto& d : datas)   delete d;
    return 0;
}
// 方法2,动态分配:
struct ThreadData
{   
    ThreadData(int i, pthread_mutex_t* lock) 
    {
        _threadName = "Thread-" + to_string(i);
        _lock = lock;
    }
    string _threadName;
    pthread_mutex_t* _lock;
};

// 共享资源,票
int ticket = 1000;

void* GetTicket(void* args)
{
    ThreadData* data = static_cast<ThreadData*>(args);
    const char* name = data->_threadName.c_str();
    while(true) {
        pthread_mutex_lock(data->_lock);    // 申请锁成功,才能往后执行,不成功,线程就阻塞等待
        if(ticket > 0) {
            usleep(1000);
            // 抢票
            printf("%s get a ticket, ticket: %d\n", name, ticket);
            ticket--;
            pthread_mutex_unlock(data->_lock);
        }
        else {
            // 没票了,该线程就break
            printf("%s can not get ticket, done\n", name);
            pthread_mutex_unlock(data->_lock);
            break;
        }
        /*防止这个线程抢完票后又立即去申请锁,让它sleep()一会儿,给其它正在被阻塞的线程一个机会,否则该线程迟迟分配不到资源, 会导致饥饿问题*/
        usleep(15);    
    }
    return nullptr;
}

int main()
{
    vector<pthread_t> tids;
    vector<ThreadData*> datas;
    pthread_mutex_t lock;
    pthread_mutex_init(&lock, nullptr);
    for(size_t i = 1; i <= NUM; ++i) {
        ThreadData* data = new ThreadData(i, &lock);
        datas.push_back(data);
        pthread_t tid;
        pthread_create(&tid, nullptr, GetTicket, datas[i-1]);
        tids.push_back(tid);
    }

    // 善后处理
    for(const auto& t : tids)    pthread_join(t, nullptr);
    for(const auto& d : datas)   delete d;
    return 0;
}

image-20240922154223283

看到了我们想要看到的,票数并没有出现负数


需要保证:

  • 外面的线程,需要排队等待
  • 已经执行完临界区的线程,不能立马申请锁,需要排到队列的尾部
  • 让所有的线程获取锁,按照一定的顺序。线程按照一定的顺序性获取资源,就叫做同步问题

每个线程访问临界区之前都需要访问锁,说明锁本身也是共享资源,所以申请锁和释放锁的过程要是原子的


纯互斥环境,如果锁分配不够合理,容易导致其他线程的饥饿问题
注意:不是说只要有互斥,必有饥饿。


在临界区中,该线程可以被切换吗
可以切换,因为在线程被切出去的时候,是持有锁被切走的,线程不在期间,其它被线程不能访问临界资源,会一直被阻塞,直到pthread_mutex_unlock()会唤醒这些线程。这样,就保证了该线程在执行临界区的时候,对其它线程是原子的

3.5 互斥量(锁)的原理

为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 下面是lock和unlock的伪代码

image-20240922190946042

lock:

最重要的是xchgb语句
交换的本质:线程把内存中的数据(共享),交换到CPU的寄存器中。即线程把一个共享数据交换到自己的硬件上下文中。而线程的上下文是线程独有的,这样该线程就持有了一个锁。

unlock:

将mutex制1

3.6 封装一下原生锁的接口,RAII风格的锁

// LockGuard.hpp
#pragma once
#include <pthread.h>
class Mutex
{
public:
    Mutex(pthread_mutex_t* lock) : _lock(lock) {}

    void Lock()
    {
        pthread_mutex_lock(_lock);
    }

    void Unlock()
    {
        pthread_mutex_unlock(_lock);
    }
private:
    pthread_mutex_t* _lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock) : _mutex(lock) 
    {
        _mutex.Lock();
    }

    ~LockGuard() 
    {
        _mutex.Unlock();
    }
private:
    Mutex _mutex;
};

改进一下3.4中方法1的GetTicket()

void* GetTicket(void* args)
{
    static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;    // 也可以使用全局变量
    ThreadData* data = static_cast<ThreadData*>(args);
    const char* name = data->_threadName.c_str();
    while(true) {
        {
            LockGuard lockGuard(&lock);
            if(ticket > 0) {
                usleep(1000);
                // 抢票
                printf("%s get a ticket, ticket: %d\n", name, ticket);
                ticket--;
                pthread_mutex_unlock(&lock);
            }
            else {
                // 没票了,该线程就break
                printf("%s can not get ticket, done\n", name);
                pthread_mutex_unlock(&lock);
                break;
            }
        }
        /* 防止这个线程抢完票后又立即去申请锁,让它sleep()一会儿,给其它正在被阻塞的线程一个机会,否则该线程迟迟分配不到资源, 会导致饥饿问题 */
        usleep(15);     
    }
    return nullptr;
}

这样,我们的代码中就不需要关心烦人和易忘的加锁和解锁,只要定义一个临时的LockGuard对象,就可以自动完成。

3.7 可重入 和 线程安全

概念

  • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数

可重入与线程安全联系

  • 函数是可重入的,那就是线程安全的
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题

4 死锁

4.1 死锁概念

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源而处于的一种永久等待状态。

一个锁,也可以产生死锁问题,比如一个线程申请完锁后,又申请锁,此时他就会带着锁阻塞,其它线程也因为得不到锁而阻塞。

4.2 死锁的4个必要条件

  • 互斥条件:一个资源每次只能被一个执行流使用
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

4.3 避免死锁

  • 破坏死锁的四个必要条件,任意一个即可
  • 加锁顺序一致,破坏循环等待条件
  • 避免锁未释放的场景
  • 资源一次性分配

4.4 避免死锁算法

  • 死锁检测算法
  • 银行家算法

5. Linux 线程同步

同步问题是保证数据安全的情况下,让线程访问资源具有一定的顺序性,从而有效避免饥饿问题。

5.1 条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在 正在访问临界资源的线程 改变状态之前,它什么也做不了。
  • 例如当一个线程尝试访问临界资源时,资源已被其它线程使用,它只能等待,假设这里有一个task_struct* wait_queue和一个铃铛🔔
    该未能访问资源的线程就会push()进这个队列中,期间再来新的线程,如果资源未就绪,也会push()到这个队列中。
    当线程访问完临界资源后,会敲一下🔔,可以选择通知某一个线程或者所有线程,让该线程访问临界资源,
    然后该线程选择退出或者继续去队尾等待。我们把这个🔔和队列合在一起,成为条件变量
  • 所以条件变量需要提供 通知线程的机制 和一个 等待队列
  • 注意每一个新来的线程都是会先尝试访问临界资源(申请锁),发现有线程正在使用,于是就去等待队列中去等待了。所以条件变量需要配合互斥锁使用,目的是为了保证数据安全。

5.2 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL	// 暂时不关心,设置为NULL
// 也可以是静态函数
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

销毁,如果用静态函数则不需要

int pthread_cond_destroy(pthread_cond_t *cond)

加入等待队列

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
  • pthread_cond_wait() 用于阻塞当前线程,等待别的线程使用pthread_cond_signal()pthread_cond_broadcast()来唤醒它。
  • pthread_cond_wait() 必须与pthread_mutex_t配套使用
  • pthread_cond_wait()函数一进入wait状态就会自动release mutex。当其他线程通过pthread_cond_signal()pthread_cond_broadcast(),把该线程唤醒,使pthread_cond_wait()通过(返回)时,该线程又自动获得该mutex

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);		// 全部唤醒
int pthread_cond_signal(pthread_cond_t *cond);			// 唤醒一个

5.3 一个案例

#define NUM 5
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 想要让多线程依次访问该临界资源
int cnt = 0;

// 自增cnt
void* AddCount(void* args)
{
    pthread_detach(pthread_self());
    int num = (size_t)args;
    printf("thread %d creat sucess\n", num);
    while(true) {
        pthread_mutex_lock(&mutex);
        /*pthread_cond_wait()会自动释放该线程的锁*/
        pthread_cond_wait(&cond, &mutex);
        printf("Thread-%d, cnt: %d\n", num, cnt++);
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    for(size_t i = 0;i < NUM; ++i) {
        pthread_t tid;
        /*注意这里不能传(void*)&i,和主线程共享同一个i。因为不清楚主线程和子线程哪一个先运行。
        如果主线程跑的很快,直接把i递增到了4,那么此时所有的线程的num就是4了,这就出现了并发问题*/
        pthread_create(&tid, nullptr, AddCount, (void*)i);
        usleep(1000);       // 保证顺序性,让结果好观察
    }
    sleep(3);
    printf("main thread start control\n");
    while(true) {
        sleep(1);
        pthread_cond_signal(&cond);     // 唤醒在cond等待队列下等待的一个线程,默认第一个
    }   
    return 0;
}

image-20240926164347593

如果第35行改为

pthread_cond_broadcast(&cond);

这时,线程就不会一个一个被唤醒,而是Thread-0到Thread-4一下全部被唤醒,打印出来


虽然16行直接让该线程去等待队列等待了。但实际上大部分情况都是因为临界资源不就绪,进而线程去等待
由于需要判断临界资源是否就绪,也是访问临界资源,所以需要在加锁之后,这样才能保证安全

6. 生产消费者模式 Producer-Consumer模式

6.1 概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

image-20240926174733001

由于多个线程都访问了同一个阻塞队列,所以会有并发问题

  • 生产者 vs 生产者:互斥
  • 消费者 vs 消费者:互斥
  • 生产者 vs 消费者:互斥,为了防止饥饿问题,也需要同步

所以这里有3种关系,2个角色(生产者和消费者),1个交易场所(特定结构的内存空间)


该模型的优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

注意,生产者生产的数据也是要花时间获取的,当有数据时,消费者做数据的加工处理,也是要花时间的

所以,不要只看到生产者生产数据到队列的过程,当阻塞队列队列满时,生产者在等待队列下等待过程中,是可以做获取数据的工作的。
同理,不要只看到消费者从队列中消费数据的过程,当阻塞队列为空时,消费者在等待队列下等待过程中,是可以做数据的加工处理动作的。
这样,这两个或者多个线程就并发高效的处理数据了,在多生产和多消费体现明显,少量的线程在等待,大量的线程在获取数据和加工数据。

6.2 基于 BlockingQueue 的生产者-消费者模式

这里是单生产者单消费者

// BlockQueue.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
using namespace std;

template<class T>
class BlockQueue
{
    static const int NUM = 10;
public:
    BlockQueue(int num = NUM) : _max(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pCond, nullptr);
        pthread_cond_init(&_cCond, nullptr);
    }

    const T pop()
    {
        pthread_mutex_lock(&_mutex);
        // 判断临界资源是否满足也是在访问临界资源,所以要在加锁之后
        if(_q.size() == 0) {
            // 队列中没有值,不需要再删除了,让它去等待队列中去等
            pthread_cond_wait(&_cCond, &_mutex);	// 调用的时候会自动释放锁,因唤醒而返回时,又会重新持有锁
        }
        const T x = _q.front();
        _q.pop();
        pthread_cond_signal(&_pCond);   // 消费者已经消费数据了,此时就可以唤醒生产者了。
        pthread_mutex_unlock(&_mutex);
        return x;
    }

    void push(const T& x)
    {   
        pthread_mutex_lock(&_mutex);
        if(_q.size() == _max) {
            // 已经到了最大值,不需要再添加了,让它去等待队列中去等
            pthread_cond_wait(&_pCond, &_mutex);
        }
        _q.push(x);
        pthread_cond_signal(&_cCond);   // 生产者已经生产数据了,此时就可以唤醒消费者了。
        pthread_mutex_unlock(&_mutex);  

    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pCond);
        pthread_cond_destroy(&_cCond);
    }

private:
    queue<T> _q;
    pthread_mutex_t _mutex;
    pthread_cond_t _cCond;  // 消费者条件变量
    pthread_cond_t _pCond;  // 生产者条件变量
    size_t _max;    // 队列能放的最大值
};
// main.cc
#include "BlockQueue.hpp"

void* Consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true) {
        int data = bq->pop();
        printf("消费者消费了一个数据:%d\n", data);
        sleep(2);
    }
    
}

void* Producer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    int data = 0;
    while(true) {
        data++;
        bq->push(data);
        printf("生产者生产了一个数据:%d\n", data);
    }
}

int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);
    pthread_create(&p, nullptr, Producer, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    return 0;
}

因为消费者消费的比较慢,所以消费者消费一个,生产者就生产一个

s

6.3 生产和消费Task

阻塞队列中存储的不仅仅可以是int,可以是自定义类型

// Task.hpp
#include <iostream>
#include <string>
using namespace std;

enum TaskStatus
{
    DIV_ZERO=1,
    MOD_ZERO,
    UNKOWN
};

char opStr[4] = {'+','-','*','/'};

class Task
{
public:
    Task(int a, int b, char op) : _a(a), _b(b), _op(op) {}

    void Run()
    {
        switch (_op)
        {
            case '+':
                _result = _a + _b;
                break;
            case '-':
                _result = _a - _b;
                break;
            case '*':
                _result = _a * _b;
                break;
            case '/':
                if (_b == 0)    _exitCode = DIV_ZERO;
                else    _result = _a / _b;
                break;
            case '%':
                if (_b == 0)    _exitCode = MOD_ZERO;
                else    _result = _a % _b;
                break;
            default:
                _exitCode = UNKOWN;
                break;
        }
    }

    string GetResult()
    {
        string resultStr;
        resultStr += to_string(_a);
        resultStr += _op;
        resultStr += to_string(_b);
        resultStr += '=';
        resultStr += to_string(_result);
        resultStr += "  ";
        if (_exitCode != 0)
        {
            resultStr += "error";
            resultStr += "  ";
            switch (_exitCode)
            {
                case DIV_ZERO:
                    resultStr += "div zero";
                    break;
                case MOD_ZERO:
                    resultStr += "mod zero";
                    break;
                case UNKOWN:
                    resultStr += "unkown";
                    break;
            }
        }
        else
            resultStr += "ok";
        return resultStr;
    }

    string GetTask()
    {
        string resultStr;
        resultStr += to_string(_a);
        resultStr += _op;
        resultStr += to_string(_b);
        resultStr += '=';
        resultStr += "???";
        return resultStr;
    }
    
private:
    int _a = 0;
    int _b = 0;
    char _op = ' ';
    int _exitCode = 0;
    int _result = 0;
};
// main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"

void* Consumer(void* args)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    while(true) {
        Task t = bq->pop();
        t.Run();
        printf("消费者获得数据: %s, 运算结果是: %s\n", t.GetTask().c_str(), t.GetResult().c_str());
        // sleep(1);
    }
    
}

void* Producer(void* args)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    int x = 0, y = 0;
    char op = ' ';
    while(true) {
        x = rand() % 100;     // [0, 99]
        y = rand() % 100;     // [0, 99]
        op = opStr[rand() % 4];
        Task t(x, y, op);
        usleep(10);              // 模拟获取数据需要时间
        bq->push(t);
        printf("生产者生产了一个任务:%s\n", t.GetTask().c_str());
        // sleep(1);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c, p;
    pthread_create(&p, nullptr, Producer, bq);
    pthread_create(&c, nullptr, Consumer, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    return 0;
}

image-20241004114412458

6.4 误唤醒

假设现在有一个消费者,多个生产者在它的条件变量下正在进行等待,现在消费者刚刚消费一个数据,阻塞队列于是就恰好有一个空位置,于是该消费者就去唤醒生产者,但是使用的不是pthread_cond_signal(&_pCond);而是pthread_cond_broadcast(&_pCond);

void push(const T& x)
{   
    pthread_mutex_lock(&_mutex);
    if(_q.size() == _max) {
        // 已经到了最大值,不需要再添加了,让它去等待队列中去等
        pthread_cond_wait(&_pCond, &_mutex);
    }
    _q.push(x);
	pthread_cond_signal(&_cCond);   // 生产者已经生产数据了,此时就可以唤醒消费者了。
    pthread_mutex_unlock(&_mutex);  
}

于是,这几个被唤醒的生产者就不在条件变量下等待了,而是都跑过去去竞争锁,但只有一个能竞争成功,该生产者于是就继续向下执行,向阻塞队列中push()数据。当该生产者生产完数据,准备唤醒消费者并解锁的时候,消费者不一定竞争成功锁,因为还有那些同样在刚才竞争锁资源失败的生产者在锁资源下等着呢!如果这些生产者又拿到锁,像阻塞队列中push()数据,就会发生错误,因为阻塞队列已经到_max了!

一个线程在条件满足被唤醒的时候,但是历史上的条件满足已经其它线程处理掉了,于是该线程只能等待,当它再次被唤醒,进行数据访问,就可能出错,我们把这种现象称为该线程被误唤醒了。或者叫虚假唤醒。即本不应该被唤醒线程被唤醒了,导致程序执行结果错误。


为了防止线程被虚假唤醒,判断临界资源是否可以被消费或生产的时候要用while()循环,循环判断,当条件不满足的时候,让该线程重新去等待队列中等待,而不是一股脑的一直在竞争锁资源

const T pop() 
{
    // ...
    while(_q.size() == 0) {}
    // ...
}
void push(const T& x)
{
    // ...
	while(_q.size() == _max) {}
    // ...
}

6.5 多生产者多消费者

static const size_t C_NUM = 5;
static const size_t P_NUM = 5;
// ...
int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c[C_NUM], p[P_NUM];
    for(size_t i = 0; i < P_NUM;++i)
        pthread_create(c+i, nullptr, Producer, bq);
    for(size_t i = 0; i < C_NUM;++i)
    pthread_create(p+i, nullptr, Consumer, bq);

    for(size_t i = 0; i < P_NUM;++i)
        pthread_join(c[i], nullptr);
    for(size_t i = 0; i < C_NUM;++i)
        pthread_join(p[i], nullptr);
    delete bq;
    return 0;
}

image-20241004144049161

由于BlockQueue中用了一把锁,所以生产者和生产者的互斥关系,生产者和消费者的互斥关系,消费者和消费者的互斥问题,都可以解决
生产和消费者的同步问题,我们使用了两个条件变量来解决
即便是多生产者多消费者,任何时刻,只允许一个线程来访问临界资源,让它来进行生产或消费

7. POSIX信号量

7.1 相关概念

7.1.1 理解信号量

信号量/信号灯的本质是一把计数器,类似 int cnt = n
用来描述临界资源中资源数量的多少!

  1. 申请计数器成功,就表示我具有访问资源的权限了
  2. 申请了计数器资源,我当前并没有访问我想要的资源。申请了计数器资源是对资源的预订机制
  3. 计数器可以有效保证进入共享资源的执行流的数量
  4. 所以每一个执行流,想访问共享资源中的一部分的时候,不是直接访问,而是先申请计数器资源。
    类似看电影的先买票!

7.1.2 二元信号量

我们把临界资源位1,信号量值只能为1,0两态的计数器叫做 二元信号量,本质就是一个锁

将计数器设置为1,资源为1的本质:其实就是将临界资源不要分成很多块了,而是当做一个整体。整体申请,整体释放

7.1.3 PV操作

申请信号量,本质是对计数器进行–操作,也就是P操作
释放资源,释放信号量,本质是对计数器进行++操作,也就是V操作

为了保证–操作和++操作不会被其他进程打扰,我们让该操作变成原子操作
即:要么不做,要做就做完,两态的,没有“正在做”这样的概念!

7.1.4 信号量凭什么是进程间通信的一种

  1. 通信不仅仅是通信数据,互相协同也是
  2. 要协同,本质也是通信,信号量首先要被所有的通信进程看到

7.1.5 总结

  1. 信号量本质是一把计数器,其PV操作是原子的
  2. 执行流想访问资源,必须先申请信号量资源,得到信号量之后,才能访问临界资源
  3. 信号量值1, 0两态的,二元信号量,就是互斥功能
  4. 申请信号量的本质: 是对临界资源的预订机制

7.2 信号量接口

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
    pshared:0表示线程间共享,非零表示进程间共享
    value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量 P()

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量 V()

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1int sem_post(sem_t *sem);//V()

7.3 基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性

image-20241004154411262

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态

生产者和消费者访问该环形队列需要满足一下以下三个原则

  1. 指向同一个位置的时候,只能一方来访问
    生产者访问
    为满:消费者访问
  2. 消费者不能超过生产者
  3. 生产者不能 套消费者一个圈

生产者关注的资源:还有多少剩余空间 SpaceSem = N
消费者关注的数据:还有多少剩余数据 DataSem = 0

/* 生产者 */
P(SpaceSem)
// 生产
V(DataSem)
    
/* 消费者 */
P(DataSem)
// 生产
V(SpaceSem)

7.4 单生产者单消费者

// RingQueue.hpp
template<class T>
class RingQueue
{
    static const size_t defaultCap = 5;
private:
    void P(sem_t& x)
    {
        sem_wait(&x);
    }
    
    void V(sem_t& x)
    {
        sem_post(&x);
    }
public:
    RingQueue(size_t capacity = defaultCap) 
    : _ringQueue(capacity)
    , _capacity(capacity)
    , _cPos(0)
    , _pPos(0)
    {
        sem_init(&_cDataSem, 0, 0);
        sem_init(&_pSpaceSem, 0, capacity);
    }

    ~RingQueue()
    {
        sem_destroy(&_cDataSem);
        sem_destroy(&_pSpaceSem);
    }

    void Push(const T& x) 
    {        
        P(_pSpaceSem);
        _ringQueue[_pPos] = x;
        V(_cDataSem);
        _pPos++;    
        _pPos %= _capacity;     // 维持队列的环形特征
    }

    void Pop(T* x)
    {
        P(_cDataSem);
        *x = _ringQueue[_cPos];
        V(_pSpaceSem);

        _cPos++;
        _cPos %= _capacity;     // 维持队列的环形特征
    }

private:
    vector<T> _ringQueue;    // 用数组模拟环形队列
    size_t _capacity;        // 队列大小
    size_t _cPos;            // 消费者下标
    size_t _pPos;            // 生产者下标
    sem_t _cDataSem;         // 消费者关心还有多少剩余数据
    sem_t _pSpaceSem;        // 生产者关心还有多少剩余空间
};
// main.cc
void* Producer(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
    while(true) {
        // 获取数据
        int data = rand() % 10;
        // 生产数据
        rq->Push(data);
        printf("Producer put data: %d\n", data);
        sleep(1);
    }
    return nullptr;
}

void* Consumer(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
    while(true) {
        // 消费数据
        int data = 0;
        rq->Pop(&data);
        printf("Consumer get data: %d\n", data);
        // 处理数据
        // sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, rq);
    pthread_create(&p, nullptr, Producer, rq);
    pthread_join(c, nullptr);     
    pthread_join(p, nullptr);     
    cout << "main thread quit" << endl;
    delete rq;
    return 0;
}

image-20241007104110977

生产者生产一个数据,消费者才能拿到一个数据,生产者和消费者之间有很好的同步性和互斥性
更改sleep()的位置,让消费者消费的慢一点

image-20241007104542832

7.6 多生产者和多消费者

需要满足生产者和生产者之间的互斥和消费者和消费者之间的互斥

任何时刻,只能有一个生产者或一个消费者在环形队列里进行生产或消费(在队列不满不空的情况下,可以有两个线程在同时访问同一个队列)
这里的临界资源是两个下标,所以要有两把锁

// RingQueue.hpp
template<class T>
class RingQueue
{
    static const size_t defaultCap = 5;
private:
    void P(sem_t& x)
    {
        sem_wait(&x);
    }
    
    void V(sem_t& x)
    {
        sem_post(&x);
    }

    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void UnLock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(size_t capacity = defaultCap) 
    : _ringQueue(capacity)
    , _capacity(capacity)
    , _cPos(0)
    , _pPos(0)
    {
        // cout << "capacity:" << capacity << endl;
        sem_init(&_cDataSem, 0, 0);
        sem_init(&_pSpaceSem, 0, capacity);
    }

    ~RingQueue()
    {
        sem_destroy(&_cDataSem);
        sem_destroy(&_pSpaceSem);
    }

    void Push(const T& x) 
    {        
        P(_pSpaceSem);

        Lock(_pMutex);          // 加锁和解锁在PV操作之后,提高并发度。只有当信号量允许时,进程才会尝试获取锁。这样可以减少锁的争用,提高系统的整体效率。
        _ringQueue[_pPos] = x;
        _pPos++;    
        _pPos %= _capacity;     // 维持队列的环形特征
        UnLock(_pMutex);

        V(_cDataSem);

    }
        

    void Pop(T* x)
    {
        P(_cDataSem);
        Lock(_cMutex);          // 加锁和解锁在PV操作之后,提高并发度。只有当信号量允许时,进程才会尝试获取锁。这样可以减少锁的争用,提高系统的整体效率。
        *x = _ringQueue[_cPos];
        _cPos++;
        _cPos %= _capacity;     // 维持队列的环形特征
        UnLock(_cMutex);
        V(_pSpaceSem);
    }

private:
    vector<T> _ringQueue;    // 用数组模拟环形队列
    size_t _capacity;        // 队列大小
    size_t _cPos;            // 消费者下标
    size_t _pPos;            // 生产者下标
    sem_t _cDataSem;         // 消费者关心还有多少剩余数据
    sem_t _pSpaceSem;        // 生产者关心还有多少剩余空间
    pthread_mutex_t _cMutex; // 消费者的锁
    pthread_mutex_t _pMutex; // 生产者的锁
};
static const size_t C_NUM = 5;
static const size_t P_NUM = 5;

struct ThreadData
{
    RingQueue<int>* rq;
    string name;
};

void* Producer(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    while(true) {
        // 获取数据
        int data = rand() % 10;
        const char* name = td->name.c_str();
        RingQueue<int>* rq = td->rq;
        // 生产数据
        rq->Push(data);
        printf("%s, put data: %d\n", name, data);
        // sleep(1);
    }
    delete td;
    return nullptr;
}

void* Consumer(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    while(true) {
        // 消费数据
        int data = 0;
        const char* name = td->name.c_str();
        RingQueue<int>* rq = td->rq;
        rq->Pop(&data);
        printf("%s, put data: %d\n", name, data);
        // 处理数据
        sleep(1);
    }
    delete td;
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c[C_NUM], p[P_NUM];
    for(size_t i = 0; i < P_NUM;++i) {
        ThreadData* pTd = new ThreadData();
        pTd->rq = rq;
        pTd->name = "Producer Thread-" + to_string(i+1);
        pthread_create(c+i, nullptr, Producer, pTd);
    }
    for(size_t i = 0; i < C_NUM;++i) {
        ThreadData* cTd = new ThreadData();
        cTd->rq = rq;
        cTd->name = "Consumer Thread-" + to_string(i+1);
        pthread_create(p+i, nullptr, Consumer, cTd);
    }
    for(size_t i = 0; i < P_NUM;++i)
        pthread_join(c[i], nullptr);
    for(size_t i = 0; i < C_NUM;++i)
        pthread_join(p[i], nullptr);
    delete rq;
    return 0;
}

image-20241007135107313

8. 线程池

概念:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

线程池的种类:

  1. 创建固定数量线程池,循环从任务队列中获取任务对象
  2. 获取到任务对象后,执行任务对象中的任务接口
// ThreadPool.cc
struct ThreadData
{
    pthread_t tid;
    string name;
};

// T表示任务的类型
template<class T>
class ThreadPool
{
    static const size_t defaultNum = 5;
private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadWake()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsQueueEmpty()
    {
        return _tasks.empty();
    }

    string GetName(const pthread_t& t)
    {
        for(const auto& e : _threads) {
            if(e.tid == t) 
                return e.name;
        }
        return "";
    }
public:
    ThreadPool(size_t num = defaultNum) : _threads(num) 
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 线程执行任务,不需要this指针,所以设置成static
    static void* StartRoutine(void* args)
    {   
        ThreadPool* tp = static_cast<ThreadPool*>(args);
        const char* name = tp->GetName(pthread_self()).c_str();
        while(true) {
            tp->Lock();
            // while循环判断防止误唤醒
            while(tp->IsQueueEmpty()) {
                tp->ThreadSleep();
            }
            T t = tp->Pop();
            tp->UnLock();
            t.Run();
            printf("%s, res: %s\n", name, t.GetResult().c_str());
        }
        return nullptr;
    }

    // 创建进程
    void Start()
    {
        int num = _threads.size();
        for (size_t i = 0; i < num; i++) {
            _threads[i].name = "Thread-" + to_string(i+1);
            pthread_create(&(_threads[i].tid), nullptr, StartRoutine, this);
        }
    }

    T Pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    // 添加任务
    void Push(const T& t)
    {
        Lock();
        _tasks.push(t);
        ThreadWake();
        UnLock();
    }
private:
    vector<ThreadData> _threads;
    queue<T> _tasks;    // 任务,这是临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};
// main.cc
#include "Task.hpp"
#include "ThreadPool.cc"
#include <ctime>

int main()
{
    srand((unsigned int)time(nullptr));
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Start();
    while(true) {
        int x = rand() % 50;
        int y = rand() % 10;
        char op = opStr[rand() % 4];
        Task t(x, y, op);
        tp->Push(t);
        printf("Main thread make task, %s\n", t.GetTask().c_str());
        sleep(1);
    }
    return 0;
}

image-20241007155652880

9. 简单封装C++的线程

// 返回值是void, 参数是T的包装器
template<class T>
using fun = function<void(T)>;

static int NUM = 1;
template<class T>
class Thread
{
public:
    Thread(fun<T> f, T data = 0) : _f(f), _data(data)
    {}

    // 让线程执行StartRoutine函数
    void Run() 
    {
        _name = "Thread-" + to_string(NUM++);
        _isRunning = true;
        pthread_create(&_tid, nullptr, StartRoutine, this);
    }

    static void* StartRoutine(void* args) 
    {
        Thread* td = static_cast<Thread*>(args);
        td->Entry();
        return nullptr;
    }

    void Join()
    {
        pthread_join(_tid, nullptr);
        _isRunning = false;
    }

    string GetName()
    {
        return _name;
    }

    bool IsRunning() 
    {
        return _isRunning;
    }

    // 回调fun
    void Entry()
    {
        _f(_data);
    }
private:
    pthread_t _tid = 0;
    string _name = "";
    bool _isRunning = false;
    T _data;     // 函数的参数
    fun<T> _f;      // 让线程执行某一个任务
};
// main.cc
#include "Thread.cc"
#include <vector>

void Run(void* args)
{
    while(true) {
        printf("New thread\n");
        // cout << a << endl;
        sleep(1);
    }
}

int main()
{
    // Thread<int> t(Run, 123);
    // t.Run();
    // t.Join();
    vector<Thread<void*>> v;
    for (size_t i = 0; i < 10; i++) {
        v.push_back(Thread<void*>(Run));
    }
    
    for(auto& e : v) {
        e.Run();
    }

    for(auto& e : v) {
        e.Join();
    }
    return 0;
}

image-20241007171143171


让线程池使用咱自己写的C++线程类

struct ThreadData
{
    pthread_t tid;
    string name;
};

// T表示任务的类型
template<class T>
class ThreadPool
{
    static const size_t defaultNum = 5;
private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadWake()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsQueueEmpty()
    {
        return _tasks.empty();
    }

    string GetName(pthread_t& t)
    {
        for(auto& e : _threads) {
            if(e.GetTid() == t) 
                return e.GetName();
        }
        return "";
    }
public:
    ThreadPool(size_t num = defaultNum)
    {
        for(size_t i = 0; i < num;++i) {    
            _threads.push_back(Thread<void*>(StartRoutine, this));
        }
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 线程执行任务,不需要this指针,所以设置成static
    static void StartRoutine(void* args)
    {   
        ThreadPool* tp = static_cast<ThreadPool*>(args);
        pthread_t self = pthread_self();
        string name = tp->GetName(self);
        while(true) {
            tp->Lock();
            // while循环判断防止误唤醒
            while(tp->IsQueueEmpty()) {
                tp->ThreadSleep();
            }
            T t = tp->Pop();
            tp->UnLock();
            t.Run();
            printf("%s, res: %s\n", name.c_str(), t.GetResult().c_str());
        }
    }

    // 创建进程
    void Start()
    {
        int num = _threads.size();
        for (size_t i = 0; i < num; i++) {
            _threads[i].Run();
        }
    }

    T Pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    // 添加任务
    void Push(const T& t)
    {
        Lock();
        _tasks.push(t);
        ThreadWake();
        UnLock();
    }
private:
    vector<Thread<void*>> _threads;
    queue<T> _tasks;    // 任务,这是临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

image-20241007175644413

10. 线程安全的单例模式

10.1 什么是设计模式

设计模式(Design Pattern)是软件工程中的一种最佳实践,它是在特定场景下解决特定问题的成熟模板或方案。设计模式是面向对象软件开发过程中经过验证的经验和智慧的结晶,它们提供了一种通用的、可复用的解决方案来解决在软件设计中遇到的常见问题。

10.2 什么是单例模式

单例模式(Singleton Pattern)是一种常用的软件设计模式,其核心目的是确保一个类只有一个实例,并提供一个全局访问点来获取这个实例。这种模式在需要控制资源访问、节省系统资源、协调系统中的共享资源时非常有用。

10.3 单例模式的特点

单例模式的主要特点包括:

  1. 唯一性:确保一个类只有一个实例。
  2. 全局访问:提供一个全局访问点来获取这个唯一的实例。

10.4 饿汉方式和懒汉方式

饿汉方式(Eager Initialization)

饿汉方式是指在程序启动时就立即创建单例对象。这种方式的优点是简单、线程安全,因为对象的创建是在程序启动时完成的,不存在多线程同时访问的问题。缺点是如果单例对象的创建比较耗时或者占用资源较多,可能会影响程序的启动速度

懒汉方式的单例模式实现如下:

class Singleton 
{
public:
    static Singleton& getInstance() 
    {
        return instance;
    }
private:
    static Singleton instance; // 静态成员变量,饿汉式,直接在类中创建实例
    Singleton() {} // 私有构造函数
    Singleton(const Singleton&) = delete; // 禁止拷贝构造
    Singleton& operator=(const Singleton&) = delete; // 禁止赋值操作
};

// 在类外初始化静态成员变量
Singleton Singleton::instance;

懒汉方式(Lazy Initialization)

懒汉方式是指在第一次使用单例对象时才创建它。这种方式的优点是可以延迟对象的创建,从而加快程序的启动速度,并且只有在真正需要时才创建对象。缺点是如果多个线程同时访问单例对象,可能会存在线程安全问题,所以要加锁。

线程不安全的懒汉方式实现的单例模式

class Singleton 
{
public:
    static Singleton* getInstance() 
    {
        if (instance == nullptr) {
            instance = new Singleton();
        }
        return instance;
    }
private:
    static Singleton* instance; // 静态成员变量指针,懒汉式,延迟创建实例
    Singleton() {} // 私有构造函数
    Singleton(const Singleton&) = delete; // 禁止拷贝构造
    Singleton& operator=(const Singleton&) = delete; // 禁止赋值操作
};

// 在类外初始化静态成员变量指针
Singleton* Singleton::instance = nullptr;

使用局部静态变量来实现线程安全的懒汉式单例,因为局部静态变量的初始化在C++ 11中是线程安全的。

class Singleton 
{
public:
    static Singleton& getInstance() {
        static Singleton instance; // 局部静态变量,线程安全的懒汉式
        return instance;
    }
private:
    Singleton() {} // 私有构造函数
    Singleton(const Singleton&) = delete; // 禁止拷贝构造
    Singleton& operator=(const Singleton&) = delete; // 禁止赋值操作
};

getInstance方法中的局部静态变量instance只会在第一次调用getInstance时被创建,之后的调用都会返回同一个实例,这种方式既实现了懒汉式的延迟加载,又保证了线程安全。

使用加锁的方式

class Singleton 
{
public:
    static Singleton* getInstance() 
    {
        if (instance == nullptr) {		// 双重判定空指针, 降低锁冲突的概率, 提高性能.
            pthread_mutex_lock(&mutex);	// 使用互斥锁, 保证多线程情况下也只调用一次 new.
            if (instance == nullptr) 
            	instance = new Singleton();
           	pthread_mutex_unlock(&mutex);
        }
        return instance;
    }
private:
    static Singleton* instance; // 静态成员变量指针,懒汉式,延迟创建实例
    static pthread_mutex_t mutex;		// 锁
    Singleton() {} // 私有构造函数
    Singleton(const Singleton&) = delete; // 禁止拷贝构造
    Singleton& operator=(const Singleton&) = delete; // 禁止赋值操作
};

// 在类外初始化静态成员变量指针
Singleton* Singleton::instance = nullptr;
pthread_mutex_t Singleton::mutex = PTHREAD_MUTEX_INITIALIZER;

10.5 单例模式的线程池

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#include <vector>
#include <queue>
using namespace std;
struct ThreadData
{
    pthread_t tid;
    string name;
};

// T表示任务的类型
template<class T>
class ThreadPool
{
public:
	// ...
    static ThreadPool* GetInstance()
    {   
        if(tp == nullptr) {
            pthread_mutex_lock(&lock);
            if(tp == nullptr) 
                tp = new ThreadPool<T>();
            pthread_mutex_unlock(&lock);
        }
        return tp;
    }
private:
    ThreadPool(size_t num = defaultNum) : _threads(num) 
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    ThreadPool(const ThreadPool& tp) = delete;
    const ThreadPool operator=(const ThreadPool& tp) = delete;

    vector<ThreadData> _threads;
    queue<T> _tasks;    // 任务,这是临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    static ThreadPool* tp;
    static pthread_mutex_t lock;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock = PTHREAD_MUTEX_INITIALIZER;

11. STL和智能指针的线程安全 问题

11.1 STL中的容器是否是线程安全的?

原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.

11.2 智能指针是否是线程安全的?

对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.

对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.

12. 其他常见的各种锁

  • 悲观锁(Pessimistic Locking):在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁(Optimistic Locking):每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
    • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁(Spinlock):当一个线程尝试获取一个已经被其他线程持有的锁时,该线程不会立即进入等待状态(即不会释放CPU),而是在原地“自旋”,也就是不停地进行忙等待(busy-waiting),直到获取到锁。
    • 当一个线程尝试获取一个已经被占用的自旋锁时,它会在原地循环检查锁的状态,直到锁变为可用。
    • 自旋锁不会使线程进入睡眠状态,因此它是一种非阻塞的同步机制。
    • 由于线程不会进入睡眠状态,自旋锁避免了线程上下文切换的开销。
    • 由于自旋锁会导致CPU资源的占用,因此它更适合于那些预计会很快释放的锁。如果锁的持有时间较长,自旋锁可能会导致CPU资源的浪费。
    • 如果持有自旋锁的线程发生阻塞,那么等待该锁的线程可能会无限期地自旋下去,导致死锁。
    • 之前使用的都属于悲观锁,是否采用自旋锁取决于线程在临界资源会待多长时间。
  • 公平锁(Fair Lock)是一种锁机制,它确保了线程获取锁的顺序与它们请求锁的顺序相同。换句话说,公平锁保证了“先来先服务”(FIFO,First-In-First-Out)的原则,即最先请求锁的线程将最先获得该锁。
  • 非公平锁(Non-Fair Lock)是一种锁机制,它不保证线程获取锁的顺序与它们请求锁的顺序相同。这意味着当一个线程尝试获取一个非公平锁时,它可能会与已经等待该锁的其他线程竞争,而不管这些线程等待了多久。

13. 读者写者问题

13.1 概念

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。

image-20241011150938421

  • 3种关系:
    • 写者 vs 写者 (互斥)
    • 读者 vs 写者 (互斥,同步)
    • 读者 vs 读者 (共享关系)这是和生产消费者模型的区别
  • 2个角色:读者和写者
  • 1个交易场所:数据交换的地点

为什么读者写者问题中读者和读者关系是共享 而生产消费者模型中 消费者和消费者的关系是互斥呢?

因为读者并不会对数据做处理,只是对数据进行读操作。而消费者会对数据进行数据处理。


一般来说,读者多,写者少。所以概率上讲读者更容易竞争到锁,写者可能会出现饥饿问题。
这是读者写者问题的特点。也可以更改这个现象,设置同步策略,让写者优先

  • 读者优先:在这种策略下,如果读者和写者同时等待访问临界区,读者会被优先允许进入。这种策略可以减少写者的等待时间,因为读者通常持有锁的时间较短。然而,如果读者持续不断地访问数据,写者可能会遭遇饥饿,即长时间无法获得对数据的访问权。
  • 写者优先::在这种策略下,如果读者和写者同时等待访问临界区,写者会被优先允许进入。这种策略可以防止读者饥饿,因为写者一旦获得访问权,会阻止新的读者进入,直到写者完成写操作。但是,如果写者频繁地访问数据,读者可能会遭遇饥饿。

13.2 读写锁接口

// 初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t* restrict attr);
// 销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
// 加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

13.3 读者优先的伪代码

int reader_count = 0;
mutex_t rlock, wlock;

// 读者加锁 && 解锁
lock(&rlock);
read_count++;
if(reader_count==1)	lock(&wlock);
unlock(&rlock);
// 读者进行读取
lock(&rlock);
reader_count--;
if(reader_count==0)	unlock(&wlock);
unlock(rlock);

// 写者加锁 && 解锁
lock(&wlock);
// 写者进行写入操作
unlock(&wlock)
  1. 读者加锁
    • 首先,读者尝试获取 rlock 锁,以安全地修改 reader_count 变量。
    • 获取 rlock 后,读者增加 reader_count 的值。
    • 如果这是第一个进入的读者(即 reader_count 从0变为1),则需要获取 wlock 锁,以阻止写者写入数据。这是因为一旦有读者在读取数据,写者就不应该修改数据,否则会影响读者读取的一致性。(读者优先!)
    • 完成 reader_count 的增加和可能的 wlock 获取后,读者释放 rlock 锁。
  2. 读者解锁
    • 读者完成读取操作后,再次获取 rlock 锁,以便安全地减少 reader_count 的值。
    • 如果这是最后一个离开的读者(即 reader_count 从1变为0),则需要释放 wlock 锁,允许写者进行写入操作。
    • 完成 reader_count 的减少和可能的 wlock 释放后,读者释放 rlock 锁。
  3. 写者加锁
    • 写者尝试获取 wlock 锁,以独占访问权进行写入操作。
    • 一旦获取 wlock 锁,写者可以安全地进行写入操作,因为此时没有读者在读取数据。
  4. 写者解锁
    • 写者完成写入操作后,释放 wlock 锁,允许其他读者或写者访问数据。