C++多线程编程:std::thread, std::async, std::future

发布于:2025-09-03 ⋅ 阅读:(10) ⋅ 点赞:(0)

1. C++多线程基础

传统的多线程问题

// 传统C++需要平台特定的API
#ifdef _WIN32
    #include <windows.h>
    HANDLE thread = CreateThread(...);
#else
    #include <pthread.h>
    pthread_t thread;
    pthread_create(&thread, ...);
#endif
// 代码不可移植,难以维护

C++11标准线程库

#include <thread>
#include <iostream>

// 跨平台的多线程编程!
void thread_function() {
    std::cout << "Hello from thread! Thread ID: " 
              << std::this_thread::get_id() << std::endl;
}

int main() {
    std::thread t(thread_function); // 创建线程
    std::cout << "Main thread ID: " 
              << std::this_thread::get_id() << std::endl;
    
    t.join(); // 等待线程结束
    return 0;
}

2. std::thread 详解

创建线程的多种方式

#include <thread>
#include <iostream>

// 1. 函数指针
void simple_function() {
    std::cout << "Simple function thread" << std::endl;
}

// 2. Lambda表达式
auto lambda = [] {
    std::cout << "Lambda thread" << std::endl;
};

// 3. 函数对象(仿函数)
struct Functor {
    void operator()() {
        std::cout << "Functor thread" << std::endl;
    }
};

// 4. 成员函数
class Worker {
public:
    void work() {
        std::cout << "Member function thread" << std::endl;
    }
};

int main() {
    // 创建线程的多种方式
    std::thread t1(simple_function);
    std::thread t2(lambda);
    std::thread t3(Functor());
    
    Worker worker;
    std::thread t4(&Worker::work, &worker); // 成员函数需要对象指针
    
    // 带参数的线程函数
    std::thread t5([](int x, const std::string& s) {
        std::cout << "Params: " << x << ", " << s << std::endl;
    }, 42, "hello");
    
    t1.join(); t2.join(); t3.join(); t4.join(); t5.join();
}

线程管理和生命周期

void worker(int id) {
    std::cout << "Worker " << id << " started" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Worker " << id << " finished" << std::endl;
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    
    // 检查线程是否可join
    if (t1.joinable()) {
        std::cout << "t1 is joinable" << std::endl;
    }
    
    // 分离线程(守护线程)
    t2.detach();
    
    // 等待t1结束
    t1.join();
    
    // t2已经被detach,不需要join
    // 注意:detach后线程独立运行,主线程结束可能终止子线程
    
    return 0;
}

线程转移所有权

void task() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main() {
    // 线程对象只能移动,不能拷贝
    std::thread t1(task);
    
    // 错误:线程对象不能拷贝
    // std::thread t2 = t1;
    
    // 正确:移动语义
    std::thread t2 = std::move(t1);
    
    // 现在t1不再关联任何线程
    if (!t1.joinable()) {
        std::cout << "t1 is not joinable" << std::endl;
    }
    
    t2.join();
    
    // 在容器中存储线程
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back([](int id) {
            std::cout << "Thread " << id << std::endl;
        }, i);
    }
    
    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }
    
    return 0;
}

3. std::async 和 std::future

异步任务执行

#include <future>
#include <iostream>
#include <chrono>

int long_running_task(int x) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return x * x;
}

int main() {
    // 启动异步任务
    std::future<int> result = std::async(std::launch::async, long_running_task, 5);
    
    std::cout << "Main thread can do other work..." << std::endl;
    
    // 获取结果(如果还没完成会阻塞等待)
    int value = result.get();
    std::cout << "Result: " << value << std::endl;
    
    return 0;
}

std::async 的启动策略

int compute(int x) {
    return x * x;
}

int main() {
    // 1. 异步执行(在新线程中)
    auto future1 = std::async(std::launch::async, compute, 5);
    
    // 2. 延迟执行(在get()/wait()时执行)
    auto future2 = std::async(std::launch::deferred, compute, 10);
    
    // 3. 自动选择(由实现决定)
    auto future3 = std::async(std::launch::async | std::launch::deferred, compute, 15);
    
    std::cout << "Future1 result: " << future1.get() << std::endl;
    std::cout << "Future2 result: " << future2.get() << std::endl; // 此时才执行
    std::cout << "Future3 result: " << future3.get() << std::endl;
    
    return 0;
}

std::future 的方法

int task() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42;
}

int main() {
    std::future<int> fut = std::async(task);
    
    // 检查状态
    if (fut.valid()) {
        std::cout << "Future is valid" << std::endl;
    }
    
    // 等待结果(阻塞)
    // fut.wait();
    
    // 带超时的等待
    auto status = fut.wait_for(std::chrono::milliseconds(500));
    if (status == std::future_status::ready) {
        std::cout << "Task completed: " << fut.get() << std::endl;
    } else if (status == std::future_status::timeout) {
        std::cout << "Task still running..." << std::endl;
        std::cout << "Final result: " << fut.get() << std::endl; // 继续等待
    }
    
    return 0;
}

4. std::promise 和 std::packaged_task

std::promise:显式设置值

void producer(std::promise<int> prom) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    prom.set_value(42); // 设置结果
    // prom.set_exception(std::make_exception_ptr(std::runtime_error("Error")));
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    
    std::thread t(producer, std::move(prom));
    
    // 消费者等待结果
    try {
        int result = fut.get();
        std::cout << "Received: " << result << std::endl;
    } catch (const std::exception& e) {
        std::cout << "Exception: " << e.what() << std::endl;
    }
    
    t.join();
    return 0;
}

std::packaged_task:包装可调用对象

int complex_computation(int x, int y) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return x * x + y * y;
}

int main() {
    // 包装函数
    std::packaged_task<int(int, int)> task(complex_computation);
    std::future<int> result = task.get_future();
    
    // 在单独线程中执行
    std::thread t(std::move(task), 3, 4);
    
    // 获取结果
    std::cout << "Result: " << result.get() << std::endl;
    
    t.join();
    
    // 也可以在当前线程执行
    std::packaged_task<int(int, int)> task2(complex_computation);
    std::future<int> result2 = task2.get_future();
    
    task2(5, 6); // 同步执行
    std::cout << "Result2: " << result2.get() << std::endl;
    
    return 0;
}

5. 线程同步和互斥

基本的互斥锁

#include <mutex>
#include <thread>
#include <vector>

std::mutex g_mutex;
int shared_data = 0;

void increment() {
    for (int i = 0; i < 100000; ++i) {
        std::lock_guard<std::mutex> lock(g_mutex); // RAII锁
        ++shared_data;
    }
}

int main() {
    std::vector<std::thread> threads;
    
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(increment);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Final value: " << shared_data << std::endl;
    return 0;
}

各种互斥锁类型

#include <mutex>
#include <shared_mutex>

std::mutex mtx; // 基本互斥锁
std::recursive_mutex rec_mtx; // 递归互斥锁
std::timed_mutex timed_mtx; // 带超时的互斥锁
std::shared_mutex shared_mtx; // 读写锁(C++17)

void reader() {
    // 写法1:显式模板参数(兼容性更好)
    std::shared_lock<std::shared_mutex> lock(shared_mtx);
    // 多个读取者可以同时访问
    
    // 写法2:C++17 CTAD(需要编译器支持)
    // std::shared_lock lock(shared_mtx);
}

void writer() {
    // 写法1:显式模板参数(兼容性更好)
    std::unique_lock<std::shared_mutex> lock(shared_mtx);
    // 只有一个写入者可以访问
    
    // 写法2:C++17 CTAD(需要编译器支持)
    // std::unique_lock lock(shared_mtx);
}

6. 实际应用场景

并行计算

#include <vector>
#include <numeric>
#include <future>

// 并行累加
template<typename Iterator>
typename Iterator::value_type parallel_sum(Iterator begin, Iterator end) {
    auto size = std::distance(begin, end);
    if (size < 1000) {
        return std::accumulate(begin, end, 0);
    }
    
    Iterator mid = begin;
    std::advance(mid, size / 2);
    
    auto left = std::async(std::launch::async, parallel_sum<Iterator>, begin, mid);
    auto right = parallel_sum(mid, end); // 当前线程执行
    
    return left.get() + right;
}

int main() {
    std::vector<int> data(10000, 1); // 10000个1
    
    auto start = std::chrono::high_resolution_clock::now();
    int sum = parallel_sum(data.begin(), data.end());
    auto end = std::chrono::high_resolution_clock::now();
    
    std::cout << "Sum: " << sum << std::endl;
    std::cout << "Time: " 
              << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
              << "ms" << std::endl;
    
    return 0;
}

生产者-消费者模式

#include <queue>
#include <condition_variable>

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable cv;
    
public:
    void push(T value) {
        std::lock_guard lock(mtx);
        queue.push(std::move(value));
        cv.notify_one();
    }
    
    T pop() {
        std::unique_lock lock(mtx);
        cv.wait(lock, [this] { return !queue.empty(); });
        
        T value = std::move(queue.front());
        queue.pop();
        return value;
    }
};

void producer(ThreadSafeQueue<int>& queue) {
    for (int i = 0; i < 10; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer(ThreadSafeQueue<int>& queue, int id) {
    for (int i = 0; i < 5; ++i) {
        int value = queue.pop();
        std::cout << "Consumer " << id << " got: " << value << std::endl;
    }
}

7. 常见问题

Q1: std::thread 和 std::async 的区别?

  • std::thread:直接管理线程,需要手动join/detach

  • std::async:高级抽象,返回future自动管理,可以选择异步或延迟执行

Q2: 什么是std::future?

std::future是一个异步操作结果的占位符,提供获取结果、等待完成、查询状态的方法。

Q3: std::promise 和 std::packaged_task 的区别?

  • std::promise:手动设置值或异常

  • std::packaged_task:包装可调用对象,自动设置返回值

Q4: 如何避免数据竞争?

:使用互斥锁(std::mutex)、原子操作(std::atomic)、线程安全的数据结构,遵循RAII原则使用std::lock_guard等。

Q5: 什么是死锁?如何避免?

:多个线程互相等待对方释放锁。避免方法:按固定顺序获取锁、使用std::lock()同时获取多个锁、使用超时锁、避免嵌套锁。

8. 最佳实践

使用RAII管理线程

class ThreadGuard {
public:
    explicit ThreadGuard(std::thread t) : t_(std::move(t)) {}
    
    ~ThreadGuard() {
        if (t_.joinable()) {
            t_.join();
        }
    }
    
    // 禁止拷贝
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
    
private:
    std::thread t_;
};

void safe_thread_usage() {
    ThreadGuard guard(std::thread([]{
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }));
    // 线程自动在guard析构时join
}

异常安全的多线程代码

void process_data() {
    std::promise<int> prom;
    auto fut = prom.get_future();
    
    std::thread t([&prom] {
        try {
            // 可能抛出异常的操作
            int result = risky_operation();
            prom.set_value(result);
        } catch (...) {
            prom.set_exception(std::current_exception());
        }
    });
    
    try {
        int result = fut.get();
        // 处理结果
    } catch (const std::exception& e) {
        std::cerr << "Thread failed: " << e.what() << std::endl;
    }
    
    t.join();
}

总结

C++11多线程编程提供了现代、安全的并发工具:

  • ✅ std::thread:直接线程管理

  • ✅ std::async/std::future:异步任务和结果处理

  • ✅ std::promise/packaged_task:更灵活的异步编程

  • ✅ RAII支持:自动资源管理

  • ✅ 类型安全:编译期检查


网站公告

今日签到

点亮在社区的每一天
去签到