线程同步方式
互斥锁
概述: 用于保护临界区,确保同一时间只有一个线程可以访问共享资源。常见的互斥锁有std::mutex,std::lock_guard和std::unique_lock
mutex
概述: 用于管理多个线程对共享资源的互斥访问,防止数据竞争和并发问题
基础用法示例:
#include <iostream>
#include <thread>
#include <mutex>
int cnt = 0; // 共享变量资源
std::mutex mtx; // 共享变量的互斥锁
void increment() {
for (int i = 0; i < 1000; i++) {
mtx.lock(); // 加锁
++cnt; // 临界区
mtx.unlock(); // 解锁
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final cnt: " << cnt << std::endl;
return 0;
}
如果不想阻塞线程,可以使用try_lock()
std::recursive_lock: 允许同一线程多次加锁,但是必须匹配相同次数的unlock()
#include <iostream>
#include <mutex>
#include <thread>
std::recursive_mutex rmtx;
void recursiveFunction(int n) {
if (n <= 0) return;
rmtx.lock();
std::cout << "Lokcing " << n << std::endl;
recursiveFunction(n-1);
rmtx.unlock();
}
int main() {
std::thread t1(recursiveFunction, 3);
t1.join();
return 0;
}
lock_guard
概述: 一个轻量级的互斥锁管理器,确保在作用域结束时自动释放
主要特点:
- 自动管理锁的获取和释放
- 不支持手动解锁
- 适用于短时间加锁的场景
基本用法示例:
#include <iostream>
#include <mutex>
#include <thread>
std::mutex mtx; // 共享互斥锁
void printMessaga(const std::string& message) {
std::lock_guard<std::mutex> lock(mtx); // 加锁,作用域结束时自动释放
std::cout << message << std::endl;
}
int main() {
std::thread t1(printMessaga, "Hello from thread 1");
std::thread t2(printMessaga, "Hello from thread 2");
t1.join();
t2.join();
return 0;
}
构造函数: 会在构造函数时自动加锁,如果不想立刻加锁,可以使用std::adopt_lock
std::mutex mtx;
std::lock_guard<std::mutex> lock(mtx, std::adopt_lock); // 采用已有的锁
此时mtx必须在lock_guard之前已经被手动lock(),否则会导致未定义行为
unique_lock
概述: 一种互斥锁管理器,比std::lock_guard更加灵活,主要用于管理std::mutex或其他BasicLockable类型的互斥锁
主要特点:
- 支持延迟锁定:std::unique_lock可以在构造时不立即获取锁,而是在稍后在需要时显式地lock()
- 支持显式解锁:可以在持有锁的情况下调用unlock()释放锁,而不像std::lock_guard那样必须等到作用域结束时释放锁
- 支持所有权的转移:可以同故宫std::move进行所有权的转移
直接锁定互斥锁:
#include <iostream>
#include <mutex>
#include <thread>
std::mutex mtx;
void threadFunc() {
std::unique_lock<std::mutex> lock(mtx); // 自动加锁
std::cout << "Thread " << std::this_thread::get_id() << " is working\n";
}
int main() {
std::thread t1(threadFunc);
std::thread t2(threadFunc);
t1.join();
t2.join();
}
延迟锁定:
#include <iostream>
#include <mutex>
#include <thread>
std::mutex mtx;
void threadFunc() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 不自动加锁
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟其他操作
lock.lock(); // 需要时手动加锁
std::cout << "Thread" << std::this_thread::get_id() << " is working\n";
} // 离开作用域时自动解锁
int main() {
std::thread t1(threadFunc);
std::thread t2(threadFunc);
t1.join();
t2.join();
}
显式解锁:
#include <iostream>
#include <mutex>
#include <thread>
std::mutex mtx;
void threadFunc() {
std::unique_lock<std::mutex> lock(mtx);
std::cout << "Thread " << std::this_thread::get_id() << " acquired lock\n";
lock.unlock(); // 显式解锁
std::cout << "Thread " << std::this_thread::get_id() << " released lock\n";
// 其他操作,不受互斥锁保护
} // 离开作用域时不会再次解锁
int main() {
std::thread t1(threadFunc);
std::thread t2(threadFunc);
t1.join();
t2.join();
}
条件变量
概述: 允许线程在不满足条件时挂起等待,在条件满足时被唤醒继续执行。通常与互斥锁一起使用,确保在修改共享数据不会出现竞态条件,常见的条件变量有std::condition_variable和std::condition_variable_any
condition_variable
概述: 在多线程环境下,通常会遇到生产者-消费者问题:线程A需要等数据准备后再执行,但它不能一直忙等,线程B生成数据后需要通知线程A继续执行。而std::condition_variable允许一个线程等待某个条件变为真,而另一个线程可以通知它条件已满足
主要方法:
- wait(std::unique_lock<std::mutex>& lock):让当前线程等待,直到被notify_one()或notify_all()唤醒
- wait(std::unique_lock<std::mutex>& lock, Predicate pred):和上面的方法类似,但会在等待前和被唤醒后检查pred是否为true
- notify_one():唤醒一个等待的线程
- notify_all():唤醒所有等待的线程
基本用法示例:
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
std::queue<int> dataQueue; // 共享数据队列
std::mutex mtx; // dataQueue的互斥锁
std::condition_variable cv;
bool ready = false; // 生产者是否已经生产数据
void producer() {
for (int i = 1; i <= 5; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟数据生成
std::lock_guard<std::mutex> lock(mtx);
dataQueue.push(i);
ready = true;
std::cout << "Produced " << i << std::endl;
cv.notify_one();
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return ready || !dataQueue.empty(); }); // 等待通知
while (!dataQueue.empty()) {
int value = dataQueue.front();
dataQueue.pop();
std::cout << "Consumed: " << value << std::endl;
}
ready = false;
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
实战练习: 多线程顺序打印数字:创建3个线程,按顺序依次打印1-100,即线程1打印1,线程2打印2,线程3打印3,然后线程1打印4,线程2打印5,线程3打印6…
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
int current = 1; // 当前打印的数字
std::mutex mtx; // current变量的互斥锁
std::condition_variable cv; // 保证线程的同步(也就是保证线程任务执行的顺序)
// 线程任务
void printNumber(int threadId) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [threadId] { return current > 100 || (current - 1) % 3 == threadId; });
if (current > 100) {
break; // 线程终止条件
}
std::cout << "Thread" << threadId << " prints " << current << std::endl;
current++;
cv.notify_all(); // 唤醒所有等待的线程
}
}
int main() {
std::thread t1(printNumber, 0);
std::thread t2(printNumber, 1);
std::thread t3(printNumber, 2);
t1.join();
t2.join();
t3.join();
return 0;
}
condition_variable_any
概述: 跟condition_variable类似,但是比其更灵活。condition_variable只能和std::unique_lock<std::mutex>搭配,而condition_variable_any可以与任何满足Lockable概念的锁搭配使用,适用于自定义锁、std::shared_mutex等,但性能就差了一些
基础用法示例:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <shared_mutex>
std::condition_variable_any cv_any;
std::shared_mutex shared_mtx;
int data = 0; // 共享变量
bool ready = false; // 条件变量判断
void producer() {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟生产数据
std::unique_lock<std::shared_mutex> lock(shared_mtx);
data = 42;
ready = true;
std::cout << "Producer: Data ready\n";
cv_any.notify_one();
}
void consumer() {
std::unique_lock<std::shared_mutex> lock(shared_mtx);
cv_any.wait(lock, [] { return ready; });
std::cout << "Consumer Data: " << data << std::endl;
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
信号量
概述: 是一个计数器,用于控制多个线程对于共享资源的访问。信号量包括二进制信号量和计数信号量。常见的信号量有std::binary_semaphore(C++20引入)和std::couting_semaphore<N>
binary_semaphore
概述: 类似于std::mutex,也具有非阻塞的try_acquire()(mutex是try_lock),但是可以手动释放(不像mutex只能由持有锁的线程释放)
主要特点:
- 值域仅为0或1:
- 0:表示资源不可用,线程阻塞等待
- 1:表示资源可用,允许线程访问
- 支持手动release():不像mutex由持有者释放,binary_semaphore可以由任何线程释放
- 支持try_acquire():非阻塞获取信号量
- 适用于控制访问:适用于单个资源的独占访问,比mutex更灵活
基础用法示例:
#include <iostream>
#include <thread>
#include <semaphore>
std::binary_semaphore sem(0); // 初始状态为0,表示资源不可用
void worker() {
std::cout << "Worker: Waiting for signal...\n";
sem.acquire(); // 等待信号量变为1
std::cout << "Worker: Acquired semaphore! Doing work...\n";
}
int main() {
std::thread t(worker);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Main: Releasing semaphore\n";
sem.release();
t.join();
return 0;
}
counting_semaphore<N>
概述: 允许多个线程同时获取资源
主要特点:
- 允许最多N个线程同时访问
- acquire()减少信号量(如果为0,阻塞)
- release()增加信号量(最多到N)
- try_acquire()非阻塞获取
基础用法示例:
#include <iostream>
#include <thread>
#include <semaphore>
std::counting_semaphore<3> sem(3); // 最多允许3个线程同时执行
void worker(int id) {
sem.acquire();
std::cout << "Thread " << id << " is working...\n";
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟工作
std::cout << "Thread " << id << " done.\n";
sem.release();
}
int main() {
std::thread threads[5];
for (int i = 0; i < 5; ++i) {
threads[i] = std::thread(worker, i);
}
for (auto& t : threads) {
t.join();
}
return 0;
}
屏障
概述: 用于在多个线程中所有线程都达到某个点时进行同步,然后继续执行。常见的屏障有std::barrier(C++20引入)
主要特点:
- 需要设置固定数量的线程(称为count)
- 线程到达barrier后必须等待,直到所有线程都到达
- 线程全部到达,自动解除barrier,所有线程继续执行
- 支持阶段同步,可以在每轮barrier完成后执行回调函数
主要方法:
- barrier(std::ptrdiff_t count, CompletionFunction f = {}):创建barrier,count是线程数,f是可选的阶段回调
- arrive_and_wait():线程到达barrier并等待,直到所有线程到达
- arrive_and_drop():线程达到后退出屏障,减少屏障计数
- arrive(n):线程到达n次(不等待)
基础用法示例:
#include <iostream>
#include <thread>
#include <barrier>
std::barrier syncPoint(3); // 3个线程必须到达屏障
void worker(int id) {
std::cout << "Thread " << id << " waiting at barrier...\n";
syncPoint.arrive_and_wait(); // 等待所有线程到达
std::cout << "Thread " << id << " passed the barrier!\n";
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
std::thread t3(worker, 3);
t1.join();
t2.join();
t3.join();
return 0;
}
带回调函数的用法:
#include <iostream>
#include <thread>
#include <barrier>
void onPhaseComplete() {
std::cout << "All threads reached the barrier. Proceeding to next phase...\n";
}
std::barrier syncPoint(3, onPhaseComplete); // 3个线程必须到达屏障
void worker(int id) {
std::cout << "Thread " << id << " waiting at barrier...\n";
syncPoint.arrive_and_wait(); // 等待所有线程到达
std::cout << "Thread " << id << " passed the barrier!\n";
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
std::thread t3(worker, 3);
t1.join();
t2.join();
t3.join();
return 0;
}
arrive_and_drop()让一个线程退出barrier:
#include <iostream>
#include <thread>
#include <barrier>
std::barrier syncPoint(3); // 3个线程必须到达屏障
void worker(int id) {
if (id == 1) {
std::cout << "Thread " << id << " leaving the barrier parmanently...\n";
syncPoint.arrive_and_drop(); // 线程1退出
return;
}
std::cout << "Thread " << id << " waiting at barrier...\n";
syncPoint.arrive_and_wait(); // 线程2和3继续同步
std::cout << "Thread " << id << " passed the barrier!\n";
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
std::thread t3(worker, 3);
t1.join();
t2.join();
t3.join();
return 0;
}
原子操作
概述: 是一种保证不会被中断的操作。常见的原子操作有std::atomic和C++11中引入的原子操作函数
基础用法示例:
#include <iostream>
#include <atomic>
#include <thread>
std::atomic<int> counter(0);
void increment() {
for (int i = 0; i < 1000; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final counter: " << counter.load() << std::endl;
return 0;
}
支持的类型:
- 整数:int\long\uint32_t等
- 指针:T*
- 布尔值
- 浮点数
主要方法:
- load():获取原子变量的值
- store(x):设置原子变量的值
- fetch_add(x):原子加法
- fetch_sub(x):原子减法
- fetch_or(x):原子或
- fetch_and(x):原子与
- exchange(x):原子赋值并返回旧值
- compare_exchange_weak(x, y):CAS操作,比较并交换值
std::memory_order内存模型:
- memory_order_relaxed:只保证原子性,不保证顺序
- memory_order_consume:依赖load()的指令不会被重排序(很少使用)
- memory_order_acquire:读取前的所有读写不会被重排序
- memory_order_release:释放前的所有读写不会被重排序
- memory_order_acq_rel:组合acquire和release
- memory_order_seq_cst:默认,保证全局顺序一致
读写锁
概述: 允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。常见的读写锁有std::shared_mutex(C++17引入)
基础用法示例:
x#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
#include <mutex>
std::shared_mutex rwMtx;
int sharedData = 0; // 共享数据
void reader(int id) {
std::shared_lock lock(rwMtx); // 共享锁
std::cout << "Reader " << id << " read value: " << sharedData << std::endl;
}
void writer(int id, int value) {
std::unique_lock lock(rwMtx); // 独占锁
sharedData = value;
std::cout << "Writer " << id << " updated value to: " << sharedData << std::endl;
}
int main() {
std::vector<std::thread> threads;
// 启动多个读线程
for (int i = 0; i < 5; ++i) {
threads.emplace_back(reader, i);
}
// 启动一个写线程
threads.emplace_back(writer, 1, 100);
// 启动更多的读线程
for (int i = 5; i < 10; ++i) {
threads.emplace_back(reader, i);
}
for (auto& t : threads) {
t.join();
}
return 0;
}