目录
📌这篇博客能带给你什么?
告别原生API的繁琐:用简洁优雅的C++类封装POSIX线程操作,让多线程开发效率提升50%
防泄漏利器:基于RAII机制的智能锁设计,即使发生异常也绝不忘记解锁。
开箱即用源码:附带经过压力测试的完整代码库,可直接集成到你的项目。
🔥为什么需要封装这些组件?
当你在开发这些场景时定会深有共鸣:
在高并发服务器中管理上百个连接线程时手忙脚乱。
因忘记解锁导致死锁,花费数小时定位BUG。
不同模块的条件变量使用不一致,引发诡异的内存错误。
想快速实现生产者消费者模型,却被信号量接口搞得头晕。
接下来我们直接进入正题:
一、线程封装
框架设计
- 为了更规范和防止命名污染,把线程的封装放在一个命名空间内进行。
- 为区分不同的线程和记录创建线程的个数,我们创建一个静态的int类型来计数。
- 因为线程的本质工作就是执行我们传入的函数,所以需要建立一个function类型方便外部传参。
所以我们做这样的设计:
这里设了模板参数是因为后期如果需要可以把func_t类型该为模板参数T让用户自己设定,为方便讲解后面的内容这里就暂时这么设定。
- fun_t _fun:储存用户传入的函数。
- pthread_t _tid:记录进程号。
- string _name:根据count计数给每个进程起一个名字作为标记。
- bool _isdetch:标记线程是否被分离,因为线程是否被分离决定了某些操作是否能执行,所以需要记录。
- bool _isrun:标记线程是否被创建(运行),同样因为线程是否被创建决定了某些操作是否能执行,所以需要记录。
构造与析构
构造函数主要是对成员变量初始化,要注意这里需要对count做++。
thread(func_t fun)
: _fun(fun), _tid(0), _isdetach(false), _isrun(false)
{
_name = "thread-" + to_string(count);
count++;
}
~thread(){}
1.线程创建
pthread_create的使用
pthread_create声明如下:
#include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
- 参数thead:输出型参数,需要传入一个pthread_t类型的变量的地址,可以得到线程的tid。
- 参数attr:这个参数跟属性相关的设定,几乎用不到,这里也就不再进,设为nullptr就行。
- 参数start_routine:传入一个返回类型为void*,参数为void*的自定义函数,线程创建后会执行这个函数。
- 参数arg:传入一个void*类型的参数,最后会被作为start_routine的参数。
- 返回值:
- 非0:创建失败。
- 0:创建成功
pthread_create函数要求传入一个void *(*start_routine) (void *)类型的函数指针,但为了可以让用户更灵活的使用,我们可以在这里进行一下封装。也就是说希望用户想传什么类型的函数指针都行,那么可以单独设计这样一个函数,如下:
static void *Func(void *p)
{
thread<T> *self = (thread<T> *)p;
self->_isrun = true;
self->_fun();
return nullptr;
}
这样在这个函数内再调用用户传入的函数_fun就行了。一个很巧妙的设计。
需要注意的几点:
- 普通的成员函数的参数是带有this指针的,不满足start_routine( pthread_create函数的参数)的要求,所以我们把它设为了静态成员函数(即加static关键字)。这样就不会有this指针。
- 没有了this指针就找不到成员变量,所以在给Func传参时需要传this指针。
- 最后不要把Func函数暴露给用户,把它设为私有(private)更为合理。
Start实现如下:
bool Start()
{
if (_isrun)
return false;
int n = pthread_create(&_tid, nullptr, &Func, (void *)this);
if (n != 0)
{
perror("pthread_create");
return false;
}
else
return true;
}
2.线程分离
如果主线程不想再关心新线程的任务执行情况等,那么就可以设置新线程为分离状态,分离的新线程依旧在进程地址空间中,依旧与主线程共享资源,线程被分离后不用进join等待,join会失败。
pthread_detach的使用
pthread_detach声明如下:
int pthread_detach(pthread_t thread);
- 参数pthread_t thread:参入需要分离的线程tid。
Detach的实现:
bool Detach()
{
if (_isdetach) return false;
if (_isrun) pthread_detach(_tid);
_isdetach = true;
}
3.线程取消
线程被取消也就是线程退出,退出结果是-1。
pthread_cancel的使用
pthread_cancel声明:
int pthread_cancel(pthread_t thread);
- 参数thread:传入需要取消的线程tid。
Cancel的实现:
bool Cancel()
{
if (!_isrun) return false;
pthread_cancel(_tid);
return true;
}
4.线程等待
线程等待和进程等待类似,为了获取到任务的完成情况。如果线程不进行等待就会造成内存泄漏。
pthread_join声明:
int pthread_join(pthread_t thread, void **retval);
- 参数thread:传入需要等待的线程tid。
- 参数retval:输出型参数,传入void*类型的地址,得到线程的任务完成情况,这个retval得到的也就是在我们传入自定义函数中返回的void*类型。
- 返回值
- 非0:失败。
- 0:成功。
Join实现:
void *Join()
{
void *ret = nullptr;
pthread_join(_tid, &ret);
return ret;
}
二、锁封装
在 Linux 中,锁是一种互斥机制,用于控制多线程或多进程对共享资源的访问,避免竞态条件(Race Condition)和数据不一致问题。锁功能的介绍:
功能:确保同一时间只有一个线程能访问共享资源(临界资源)。
特点:
线程独占,其他线程会被阻塞直到锁释放。
适用于线程间的互斥。
锁的核心作用
原子性:确保操作不可分割,即在CPU中一次性完成,在此期间可以被切换,但不会被其它执行流介入。
可见性:保证锁释放后修改对其他线程可见。
有序性:防止指令重排导致的逻辑错误。
框架设计
- Mutex:完成锁的初始化
- void Lock:加锁。
- void Unlock:解锁。
- pthread_mutex_t* GetLock():得到锁的信息,方便与后面的条件变量、信号量进行交互。
- ~Mutex:锁的释放。
- pthread_mutex_t _mutex:储存锁的信息。
构造与析构
构造需要完成锁的初始化,使用pthread_mutex_init函数。
pthread_mutex_init的声明:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
- 参数restrict mutex:传入一个pthread_mutex_t类型的变量地址,函数会初始化这个互斥锁。
- 参数restrict attr:用来设置锁的属性,这里不做探讨,设为nullptr即可。
Mutex()实现:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
析构函数完成锁的释放
锁本身不会直接导致内存泄漏,但如果使用不当,可能会间接引发资源泄漏或死锁问题,所以我们需对锁进行destroy。
pthread_destroy的声明:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- 参数mutex:传入一个pthread_mutex_t类型的变量地址,函数会回收这个互斥锁。
~Mutex()实现
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
1.加锁
pthread_mutex_lock的使用
pthread_mutex_lock的声明
int pthread_mutex_lock(pthread_mutex_t *mutex);
- pthread_mutex_t:传入锁的地址,类型为pthread_mutex_t*。
void Lock()实现:
void Lock()
{
pthread_mutex_lock(&_mutex);
}
2.解锁
pthread_mutex_unlock的使用
pthread_mutex_lock的声明
int pthread_mutex_unlock(pthread_mutex_t *mutex);
- pthread_mutex_t:传入锁的地址,类型为pthread_mutex_t*。
void Unlock()实现:
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
3.RAII模式
class LockGuard
{
public:
LockGuard(Mutex& lock)
: _lock(lock)
{
_lock.Lock();
}
~LockGuard()
{
_lock.Unlock();
}
private:
Mutex& _lock;
};
RAII模式的锁好处在于在创建对象时自动加锁,在销毁时自动解锁,是全自动的,当临界区代码抛出异常时,RAII 对象的析构函数仍会执行,确保锁被释放,避免死锁。
但是在使用RAII模式的锁时要注意作用域的问题,确保锁在正确的作用域添加和释放。
三、条件变量封装
条件变量是用来使线程或进程之间同步的,避免了一个锁被同一执行流循环式的重复占用。
- 锁:解决互斥问题(对错问题)
- 条件变量:解决同步问题(是否公平问题)
它是怎么实现同步呢?其实是执行流之间互相“通知”,比如当某个条件不满足的时候,这个执行流就不要申请锁了,就在这里等着。等条件满足其它执行流会通知你。
框架设计
- Cond:完成条件变量的初始化。
- void Wait:执行流进行等待。
- void Signal:唤醒一个执行流。
- void Broadcast:唤醒所有执行流。
- ~Cond: 销毁条件变量。
构造与析构
条件变量的接口使用和锁的接口完全类似。
pthread_cond_init声明
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- 参数restrict cond:传入一个pthread_cond_t类型的变量地址,函数会初始化这个条件变量。
- 参数restrict attr:用来设置条件变量的属性,这里不做探讨,设为nullptr即可。
pthread_cond_destroy声明
int pthread_cond_destroy(pthread_cond_t *cond);
- 参数mutex:传入一个pthread_cond_t类型的变量地址,函数会回收这个条件变量。
Cond与~Cond实现:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
~Cond()
{
pthread_cond_signal(&_cond);
}
1.线程等待
pthread_cond_wait的声明
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- 参数restrict cond:传入条件变量地址。
- 参数restrict mutex:传入一个互斥锁。
pthread_cond_wait为什么要传入锁呢?其实是这样的,首先执行流申请了锁进入临界区后却因为某些条件不满足,无法往下执行,所以需要等待,但又不能一直霸占着锁,要不然就变成死锁了,所以需要把锁释放。
pthread_cond_wait函数的执行逻辑就是释放锁然后等待,当被唤醒时再次申请锁。
void Wait(Mutex mutex) 实现
void Wait(Mutex mutex)
{
pthread_cond_wait(&_cond,mutex.GetLock());
}
2.唤醒一个线程
pthread_cond_signal作用是唤醒一个正在等待的线程,声明如下:
int pthread_cond_signal(pthread_cond_t *cond);
void Signal()实现
void Signal()
{
pthread_cond_signal(&_cond);
}
3.唤醒所有线程
pthread_cond_broadcast作用是唤醒所有正在等待的线程,声明如下:
void Broadcast()实现
void Broadcast()
{
pthread_cond_broadcast(&_cond);
}
四、信号量的封装
信号量的理解
当一个共享资源可以分为小块去访问,那么就可以使用一个计数器去记录这些小块资源,在被使用中的有多少未被使用的有多少,它相当于一种资源的预定机制。比如在生产者消费者模型中用环形队列做交易场所,那么信号量就可以来完成生产者和消费者之间需要的互斥和同步功能。
- P操作:资源预定成功,信号量减1。
- V操作:资源使用结束,信号量加1。
Sem(信号量)的封装几乎和Mutex和Cond一样,这里就不做详细讲解。
#include <semaphore.h>
#define DEF 1
class Sem
{
public:
Sem(int val = DEF)
{
sem_init(&_sem,0,val);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};
五、源码
1.Thread.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <functional>
#include <string>
#include <errno.h>
using namespace std;
namespace my_thread
{
static int count = 1;
template <typename T>
class thread
{
using func_t = std::function<void()>;
private:
static void *Func(void *p)
{
thread<T> *self = (thread<T> *)p;
self->_isrun = true;
self->_fun();
return nullptr;
}
public:
thread(func_t fun)
: _fun(fun), _tid(0), _isdetach(false), _isrun(false)
{
_name = "thread-" + to_string(count);
count++;
}
bool Start()
{
if (_isrun)
return false;
int n = pthread_create(&_tid, nullptr, &Func, (void *)this);
if (n != 0)
{
perror("pthread_create");
return false;
}
else
return true;
}
bool Cancel()
{
if(_isrun)
pthread_cancel(_tid);
else return false;
return true;
}
bool Detach()
{
if (_isdetach)
return false;
if(_isrun)
pthread_detach(_tid);
_isdetach = true;
}
void *Join()
{
void *ret = nullptr;
pthread_join(_tid, &ret);
return ret;
}
~thread()
{}
private:
func_t _fun;
pthread_t _tid;
string _name;
bool _isdetach;
bool _isrun;
};
}
2.Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
namespace my_mutex
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
pthread_mutex_t* GetLock()
{
return &_mutex;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex& lock)
: _lock(lock)
{
_lock.Lock();
}
~LockGuard()
{
_lock.Unlock();
}
private:
Mutex& _lock;
};
}
3.Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"
using namespace my_mutex;
namespace my_cond
{
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}
void Wait(Mutex mutex)
{
pthread_cond_wait(&_cond,mutex.GetLock());
}
void Signal()
{
pthread_cond_signal(&_cond);
}
void Broadcast()
{
pthread_cond_broadcast(&_cond);
}
~Cond()
{
pthread_cond_signal(&_cond);
}
private:
pthread_cond_t _cond;
};
}
class Cond
{
public:
Cond();
void Wait(Mutex mutex);
void Signal();
void Broadcast();
~Cond();
private:
pthread_cond_t _cond;
};
4. Sem.hpp
#pragma once
#include <semaphore.h>
#define DEF 1
class Sem
{
public:
Sem(int val = DEF)
{
sem_init(&_sem,0,val);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};