『 C++ 』多线程同步:条件变量及其接口的应用实践

发布于:2025-03-23 ⋅ 阅读:(31) ⋅ 点赞:(0)

条件变量概述

在 C++ 多线程编程里,同步机制是极为关键的部分,它能保证多个线程安全且高效地访问共享资源。其中,条件变量(std::condition_variable)和 std::unique_lock::try_lock_until 是很实用的工具。接下来,我们会深入探讨它们的应用。

条件变量简介

条件变量是 C++ 标准库中的一个同步原语,它可让线程在特定条件达成时被唤醒。其主要用途是线程间的等待 - 通知机制,一个线程等待某个条件成立,而另一个线程在条件成立时通知等待的线程。

条件变量的基本用法

条件变量的基本操作包含 waitwait_forwait_untilnotify_one/notify_all

  • wait:使线程进入等待状态,同时释放互斥量,直到被其他线程唤醒。
  • wait_for:线程会等待一段时间,若在这段时间内被通知则继续执行,若超时则继续执行。
  • wait_until:线程会等待到指定的时间点,若在该时间点前被通知则继续执行,若到达时间点还未被通知则继续执行。
  • notify_one:唤醒一个正在等待的线程。
  • notify_all:唤醒所有正在等待的线程。

案例:两个线程交替打印奇偶数

以下是一个使用条件变量实现两个线程交替打印奇偶数的案例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

using namespace std;
void two_thread_print()
{
    std::mutex mtx;
    condition_variable c;
    int n = 100;
    bool flag = true;

    thread t1([&]() {
        int i = 0;
        while (i < n)
        {
            unique_lock<mutex> lock(mtx);
            c.wait(lock, [&]()->bool {return flag; });
            cout << i << endl;
            flag = false;
            i += 2; // 偶数
            c.notify_one();
        }
    });

    thread t2([&]() {
        int j = 1;
        while (j < n)
        {
            unique_lock<mutex> lock(mtx);
            c.wait(lock, [&]()->bool {return !flag; });
            cout << j << endl;
            flag = true;
            j += 2; // 奇数
            c.notify_one();
        }
    });

    t1.join();
    t2.join();
}

int main()
{
    two_thread_print();

    return 0;
}    

代码解释

  1. 线程函数
    • t1 线程负责打印偶数,它会等待 flagtrue 时开始打印,打印完成后将 flag 置为 false,并通知另一个线程。
    • t2 线程负责打印奇数,它会等待 flagfalse 时开始打印,打印完成后将 flag 置为 true,并通知另一个线程。
  2. 条件变量的使用
    • c.wait(lock, [&]()->bool {return flag; });c.wait(lock, [&]()->bool {return !flag; }); 用于线程的等待,当条件不满足时线程会进入等待状态,同时释放互斥量。
    • c.notify_one(); 用于唤醒另一个等待的线程。
  3. 线程同步
    • 通过 t1.join()t2.join() 确保主线程等待两个子线程执行完毕。

std::unique_lock::try_lock_until 介绍

std::unique_lock::try_lock_until 是 C++ 标准库 <mutex> 头文件中的一部分,用于尝试在指定的时间点之前锁定关联的互斥量。如果在指定时间之前成功锁定,它会返回 true;若超时仍未锁定,则返回 false

代码示例

下面的代码模拟了一个多线程场景,主线程会尝试在指定时间内锁定互斥量,而工作线程会先占用一段时间互斥量。

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::mutex mtx;

void worker() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::lock_guard<std::mutex> lock(mtx);
    std::cout << "Worker thread locked the mutex." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Worker thread unlocked the mutex." << std::endl;
}

int main() {
    std::thread t(worker);

    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    auto timeout = std::chrono::steady_clock::now() + std::chrono::seconds(1);

    if (lock.try_lock_until(timeout)) {
        std::cout << "Main thread locked the mutex." << std::endl;
        lock.unlock();
    } else {
        std::cout << "Main thread failed to lock the mutex within the timeout." << std::endl;
    }

    t.join();
    return 0;
}    

代码解释

  1. 线程函数 worker:工作线程会先休眠 2 秒,然后锁定互斥量 mtx,打印信息,再休眠 2 秒,最后解锁互斥量。
  2. 主线程逻辑
    • 创建工作线程 t
    • 创建 std::unique_lock 对象 lock,初始时不锁定互斥量。
    • 设定超时时间为当前时间加上 1 秒。
    • 调用 try_lock_until 尝试在超时时间之前锁定互斥量。
    • 根据返回结果输出相应信息。
  3. 线程同步:主线程通过 t.join() 等待工作线程结束。

注意事项

  • 此函数适用于需要在一定时间内尝试锁定互斥量的场景,避免无限期等待。
  • 超时时间点可以使用不同的时钟类型和时间单位,如 std::chrono::steady_clockstd::chrono::system_clock

std::condition_variable::wait 详细解析与示例

std::condition_variable::wait 接口介绍

std::condition_variable::wait 有两种重载形式:

// 无条件形式
void wait (unique_lock<mutex>& lck);
// 带谓词形式
template <class Predicate>  
void wait (unique_lock<mutex>& lck, Predicate pred);

当前线程(应已锁定互斥锁)的执行将被阻止,直到收到通知。在阻塞线程的那一刻,该函数会自动调用 lck.unlock(),允许其他锁定的线程继续。一旦收到通知(显式地,由其他线程通知),该函数就会解除阻塞并调用 lck.lock(),离开时的状态与调用函数时的状态相同。然后函数返回(请注意,最后一个互斥锁可能会在返回之前再次阻塞线程)。

通常,通过在另一个线程中调用 notify_onenotify_all 来通知函数唤醒。但某些实现可能会在不调用这些函数的情况下产生虚假的唤醒调用。因此,此功能的用户应确保满足其恢复条件。

如果指定了带谓词的版本(2),则该函数仅在 pred() 返回 false 时阻塞,并且通知只能在线程变为 pred() 返回 true 时取消阻塞线程(这对于检查虚假唤醒调用特别有用)。

代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker() {
    std::unique_lock<std::mutex> lock(mtx);
    // 使用带谓词的 wait 版本
    cv.wait(lock, []{ return ready; });
    std::cout << "Worker thread is working." << std::endl;
}

int main() {
    std::thread t(worker);

    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true;
    }
    cv.notify_one();

    t.join();
    return 0;
}

代码解释

  1. worker 线程worker 线程获取 unique_lock 并调用 cv.wait(lock, []{ return ready; });,如果 readyfalse,线程会进入等待状态,同时释放互斥锁。当收到通知并且 ready 变为 true 时,线程会重新获取互斥锁并继续执行。
  2. 主线程:主线程设置 readytrue 并调用 cv.notify_one() 通知等待的线程。

关于最后一个互斥锁可能再次阻塞线程

cv.wait(lock, []{ return ready; }); 中,当收到通知且谓词 []{ return ready; } 返回 true 时,wait 函数会尝试重新锁定互斥锁(调用 lck.lock())。如果此时互斥锁被其他线程持有,当前线程会再次被阻塞,直到成功获取互斥锁。

条件变量其他接口的应用示例

wait_for 接口应用

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker() {
    std::unique_lock<std::mutex> lock(mtx);
    if (cv.wait_for(lock, std::chrono::seconds(2), []{ return ready; })) {
        std::cout << "Worker thread got notified in time." << std::endl;
    } else {
        std::cout << "Worker thread timed out." << std::endl;
    }
}

int main() {
    std::thread t(worker);
    std::this_thread::sleep_for(std::chrono::seconds(3));
    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true;
    }
    cv.notify_one();
    t.join();
    return 0;
}

这里 worker 线程会等待 2 秒,如果在这 2 秒内没有收到通知,就会超时继续执行。

wait_until 接口应用

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::timed_mutex mtx;
bool ready = false;

void worker() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::lock_guard<std::timed_mutex> lock(mtx); // 使用 std::timed_mutex
    std::cout << "Worker thread locked the mutex." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    ready = true;
    std::cout << "Worker thread unlocked the mutex." << std::endl;
}

int main() {
    std::thread t(worker);

    std::unique_lock<std::timed_mutex> lock(mtx, std::defer_lock); // 使用 std::timed_mutex
    auto timeout = std::chrono::system_clock::now() + std::chrono::seconds(1);

    if (lock.try_lock_until(timeout)) { // 正确:std::timed_mutex 支持 try_lock_until
        std::cout << "Main thread locked the mutex." << std::endl;
        lock.unlock();
    }
    else {
        std::cout << "Main thread failed to lock the mutex within the timeout." << std::endl;
    }

    t.join();
    return 0;
}

worker 线程会等待到指定的时间点,如果在该时间点前收到通知则继续执行,否则超时继续执行。

notify_one 接口应用

参考前面两个线程交替打印奇偶数的例子,c.notify_one(); 用于唤醒另一个等待的线程。

notify_all 接口应用

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker(int id) {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return ready; });
    std::cout << "Worker thread " << id << " is working." << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back(worker, i);
    }
    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true;
    }
    cv.notify_all();
    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

在这个例子中,notify_all 会唤醒所有等待的线程。

通过条件变量和 std::unique_lock::try_lock_until,我们能够更灵活地实现多线程同步,避免死锁和资源竞争问题。在实际开发中,根据具体需求合理运用这些工具,可以提高程序的性能和稳定性。