C++中的线程同步方式

发布于:2025-02-19 ⋅ 阅读:(17) ⋅ 点赞:(0)

线程同步方式

互斥锁

概述: 用于保护临界区,确保同一时间只有一个线程可以访问共享资源。常见的互斥锁有std::mutex,std::lock_guard和std::unique_lock

mutex

概述: 用于管理多个线程对共享资源的互斥访问,防止数据竞争和并发问题

基础用法示例:

#include <iostream>
#include <thread>
#include <mutex>

int cnt = 0; // 共享变量资源
std::mutex mtx; // 共享变量的互斥锁

void increment() {
    for (int i = 0; i < 1000; i++) {
        mtx.lock(); // 加锁
        ++cnt; // 临界区
        mtx.unlock(); // 解锁
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final cnt: " << cnt << std::endl;
    return 0;
}

如果不想阻塞线程,可以使用try_lock()

std::recursive_lock: 允许同一线程多次加锁,但是必须匹配相同次数的unlock()

#include <iostream>
#include <mutex>
#include <thread>

std::recursive_mutex rmtx;

void recursiveFunction(int n) {
    if (n <= 0) return;

    rmtx.lock();
    std::cout << "Lokcing " << n << std::endl;
    recursiveFunction(n-1);
    rmtx.unlock(); 
}

int main() {
    std::thread t1(recursiveFunction, 3);
    t1.join();
    return 0;
}

lock_guard

概述: 一个轻量级的互斥锁管理器,确保在作用域结束时自动释放

主要特点:

  • 自动管理锁的获取和释放
  • 不支持手动解锁
  • 适用于短时间加锁的场景

基本用法示例:

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx; // 共享互斥锁

void printMessaga(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx); // 加锁,作用域结束时自动释放
    std::cout << message << std::endl;
}

int main() {
    std::thread t1(printMessaga, "Hello from thread 1");
    std::thread t2(printMessaga, "Hello from thread 2");

    t1.join();
    t2.join();

    return 0;
}

构造函数: 会在构造函数时自动加锁,如果不想立刻加锁,可以使用std::adopt_lock

std::mutex mtx;
std::lock_guard<std::mutex> lock(mtx, std::adopt_lock); // 采用已有的锁

此时mtx必须在lock_guard之前已经被手动lock(),否则会导致未定义行为

unique_lock

概述: 一种互斥锁管理器,比std::lock_guard更加灵活,主要用于管理std::mutex或其他BasicLockable类型的互斥锁

主要特点:

  • 支持延迟锁定:std::unique_lock可以在构造时不立即获取锁,而是在稍后在需要时显式地lock()
  • 支持显式解锁:可以在持有锁的情况下调用unlock()释放锁,而不像std::lock_guard那样必须等到作用域结束时释放锁
  • 支持所有权的转移:可以同故宫std::move进行所有权的转移

直接锁定互斥锁:

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void threadFunc() {
    std::unique_lock<std::mutex> lock(mtx); // 自动加锁
    std::cout << "Thread " << std::this_thread::get_id() << " is working\n";
}

int main() {
    std::thread t1(threadFunc);
    std::thread t2(threadFunc);

    t1.join();
    t2.join();
}

延迟锁定:

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void threadFunc() {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 不自动加锁
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟其他操作
    lock.lock(); // 需要时手动加锁
    std::cout << "Thread" << std::this_thread::get_id() << " is working\n";
} // 离开作用域时自动解锁

int main() {
    std::thread t1(threadFunc);
    std::thread t2(threadFunc);

    t1.join();
    t2.join();
}

显式解锁:

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void threadFunc() {
    std::unique_lock<std::mutex> lock(mtx);
    std::cout << "Thread " << std::this_thread::get_id() << " acquired lock\n";

    lock.unlock(); // 显式解锁
    std::cout << "Thread " << std::this_thread::get_id() << " released lock\n";

    // 其他操作,不受互斥锁保护
} // 离开作用域时不会再次解锁

int main() {
    std::thread t1(threadFunc);
    std::thread t2(threadFunc);

    t1.join();
    t2.join();
}

条件变量

概述: 允许线程在不满足条件时挂起等待,在条件满足时被唤醒继续执行。通常与互斥锁一起使用,确保在修改共享数据不会出现竞态条件,常见的条件变量有std::condition_variable和std::condition_variable_any

condition_variable

概述: 在多线程环境下,通常会遇到生产者-消费者问题:线程A需要等数据准备后再执行,但它不能一直忙等,线程B生成数据后需要通知线程A继续执行。而std::condition_variable允许一个线程等待某个条件变为真,而另一个线程可以通知它条件已满足

主要方法:

  • wait(std::unique_lock<std::mutex>& lock):让当前线程等待,直到被notify_one()或notify_all()唤醒
  • wait(std::unique_lock<std::mutex>& lock, Predicate pred):和上面的方法类似,但会在等待前和被唤醒后检查pred是否为true
  • notify_one():唤醒一个等待的线程
  • notify_all():唤醒所有等待的线程

基本用法示例:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>

std::queue<int> dataQueue; // 共享数据队列
std::mutex mtx; // dataQueue的互斥锁
std::condition_variable cv;
bool ready = false; // 生产者是否已经生产数据

void producer() {
    for (int i = 1; i <= 5; ++i) {
        std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟数据生成
        std::lock_guard<std::mutex> lock(mtx);
        dataQueue.push(i);
        ready = true;
        std::cout << "Produced " << i << std::endl;
        cv.notify_one();
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return ready || !dataQueue.empty(); }); // 等待通知

        while (!dataQueue.empty()) {
            int value = dataQueue.front();
            dataQueue.pop();
            std::cout << "Consumed: " << value << std::endl;
        }

        ready = false;
    }
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    return 0;
}

实战练习: 多线程顺序打印数字:创建3个线程,按顺序依次打印1-100,即线程1打印1,线程2打印2,线程3打印3,然后线程1打印4,线程2打印5,线程3打印6…

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

int current = 1; // 当前打印的数字
std::mutex mtx; // current变量的互斥锁
std::condition_variable cv; // 保证线程的同步(也就是保证线程任务执行的顺序)

// 线程任务
void printNumber(int threadId) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [threadId] { return current > 100 || (current - 1) % 3 == threadId; });

        if (current > 100) {
            break; // 线程终止条件
        }

        std::cout << "Thread" << threadId << " prints " << current << std::endl;
        current++;

        cv.notify_all(); // 唤醒所有等待的线程
    }
}

int main() {
    std::thread t1(printNumber, 0);
    std::thread t2(printNumber, 1);
    std::thread t3(printNumber, 2);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

condition_variable_any

概述: 跟condition_variable类似,但是比其更灵活。condition_variable只能和std::unique_lock<std::mutex>搭配,而condition_variable_any可以与任何满足Lockable概念的锁搭配使用,适用于自定义锁、std::shared_mutex等,但性能就差了一些

基础用法示例:

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <shared_mutex>

std::condition_variable_any cv_any;
std::shared_mutex shared_mtx;
int data = 0; // 共享变量
bool ready = false; // 条件变量判断

void producer() {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟生产数据
    std::unique_lock<std::shared_mutex> lock(shared_mtx);
    data = 42;
    ready = true;
    std::cout << "Producer: Data ready\n";
    cv_any.notify_one();
}

void consumer() {
    std::unique_lock<std::shared_mutex> lock(shared_mtx);
    cv_any.wait(lock, [] { return ready; });
    std::cout << "Consumer Data: " << data << std::endl;
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    return 0;
}

信号量

概述: 是一个计数器,用于控制多个线程对于共享资源的访问。信号量包括二进制信号量和计数信号量。常见的信号量有std::binary_semaphore(C++20引入)和std::couting_semaphore<N>

binary_semaphore

概述: 类似于std::mutex,也具有非阻塞的try_acquire()(mutex是try_lock),但是可以手动释放(不像mutex只能由持有锁的线程释放)

主要特点:

  • 值域仅为0或1:
    • 0:表示资源不可用,线程阻塞等待
    • 1:表示资源可用,允许线程访问
  • 支持手动release():不像mutex由持有者释放,binary_semaphore可以由任何线程释放
  • 支持try_acquire():非阻塞获取信号量
  • 适用于控制访问:适用于单个资源的独占访问,比mutex更灵活

基础用法示例:

#include <iostream>
#include <thread>
#include <semaphore>

std::binary_semaphore sem(0); // 初始状态为0,表示资源不可用

void worker() {
    std::cout << "Worker: Waiting for signal...\n";
    sem.acquire(); // 等待信号量变为1
    std::cout << "Worker: Acquired semaphore! Doing work...\n";
}

int main() {
    std::thread t(worker);

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Main: Releasing semaphore\n";
    sem.release();

    t.join();
    return 0;
}

counting_semaphore<N>

概述: 允许多个线程同时获取资源

主要特点:

  • 允许最多N个线程同时访问
  • acquire()减少信号量(如果为0,阻塞)
  • release()增加信号量(最多到N)
  • try_acquire()非阻塞获取

基础用法示例:

#include <iostream>
#include <thread>
#include <semaphore>

std::counting_semaphore<3> sem(3); // 最多允许3个线程同时执行

void worker(int id) {
    sem.acquire();
    std::cout << "Thread " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟工作
    std::cout << "Thread " << id << " done.\n";
    sem.release();
}

int main() {
    std::thread threads[5];

    for (int i = 0; i < 5; ++i) {
        threads[i] = std::thread(worker, i);
    }

    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

屏障

概述: 用于在多个线程中所有线程都达到某个点时进行同步,然后继续执行。常见的屏障有std::barrier(C++20引入)

主要特点:

  • 需要设置固定数量的线程(称为count)
  • 线程到达barrier后必须等待,直到所有线程都到达
  • 线程全部到达,自动解除barrier,所有线程继续执行
  • 支持阶段同步,可以在每轮barrier完成后执行回调函数

主要方法:

  • barrier(std::ptrdiff_t count, CompletionFunction f = {}):创建barrier,count是线程数,f是可选的阶段回调
  • arrive_and_wait():线程到达barrier并等待,直到所有线程到达
  • arrive_and_drop():线程达到后退出屏障,减少屏障计数
  • arrive(n):线程到达n次(不等待)

基础用法示例:

#include <iostream>
#include <thread>
#include <barrier>

std::barrier syncPoint(3); // 3个线程必须到达屏障

void worker(int id) {
    std::cout << "Thread " << id << " waiting at barrier...\n";
    syncPoint.arrive_and_wait(); // 等待所有线程到达
    std::cout << "Thread " << id << " passed the barrier!\n";
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    std::thread t3(worker, 3);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

带回调函数的用法:

#include <iostream>
#include <thread>
#include <barrier>

void onPhaseComplete() {
    std::cout << "All threads reached the barrier. Proceeding to next phase...\n";
}

std::barrier syncPoint(3, onPhaseComplete); // 3个线程必须到达屏障

void worker(int id) {
    std::cout << "Thread " << id << " waiting at barrier...\n";
    syncPoint.arrive_and_wait(); // 等待所有线程到达
    std::cout << "Thread " << id << " passed the barrier!\n";
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    std::thread t3(worker, 3);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

arrive_and_drop()让一个线程退出barrier:

#include <iostream>
#include <thread>
#include <barrier>

std::barrier syncPoint(3); // 3个线程必须到达屏障

void worker(int id) {
    if (id == 1) {
        std::cout << "Thread " << id << " leaving the barrier parmanently...\n";
        syncPoint.arrive_and_drop(); // 线程1退出
        return;
    }


    std::cout << "Thread " << id << " waiting at barrier...\n";
    syncPoint.arrive_and_wait(); // 线程2和3继续同步
    std::cout << "Thread " << id << " passed the barrier!\n";
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    std::thread t3(worker, 3);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

原子操作

概述: 是一种保证不会被中断的操作。常见的原子操作有std::atomic和C++11中引入的原子操作函数

基础用法示例:

#include <iostream>
#include <atomic>
#include <thread>

std::atomic<int> counter(0);

void increment() {
    for (int i = 0; i < 1000; ++i) {
        counter.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final counter: " << counter.load() << std::endl;
    return 0;
}

支持的类型:

  • 整数:int\long\uint32_t等
  • 指针:T*
  • 布尔值
  • 浮点数

主要方法:

  • load():获取原子变量的值
  • store(x):设置原子变量的值
  • fetch_add(x):原子加法
  • fetch_sub(x):原子减法
  • fetch_or(x):原子或
  • fetch_and(x):原子与
  • exchange(x):原子赋值并返回旧值
  • compare_exchange_weak(x, y):CAS操作,比较并交换值

std::memory_order内存模型:

  • memory_order_relaxed:只保证原子性,不保证顺序
  • memory_order_consume:依赖load()的指令不会被重排序(很少使用)
  • memory_order_acquire:读取前的所有读写不会被重排序
  • memory_order_release:释放前的所有读写不会被重排序
  • memory_order_acq_rel:组合acquire和release
  • memory_order_seq_cst:默认,保证全局顺序一致

读写锁

概述: 允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。常见的读写锁有std::shared_mutex(C++17引入)

基础用法示例:

x#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
#include <mutex>

std::shared_mutex rwMtx;
int sharedData = 0; // 共享数据

void reader(int id) {
    std::shared_lock lock(rwMtx); // 共享锁
    std::cout << "Reader " << id << " read value: " << sharedData << std::endl;
}

void writer(int id, int value) {
    std::unique_lock lock(rwMtx); // 独占锁
    sharedData = value;
    std::cout << "Writer " << id << " updated value to: " << sharedData << std::endl;
}

int main() {
    std::vector<std::thread> threads;

    // 启动多个读线程
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(reader, i);
    }

    // 启动一个写线程
    threads.emplace_back(writer, 1, 100);

    // 启动更多的读线程
    for (int i = 5; i < 10; ++i) {
        threads.emplace_back(reader, i);
    }

    for (auto& t : threads) {
        t.join();
    }

    return 0;
}


网站公告

今日签到

点亮在社区的每一天
去签到