参考:https://llfc.club/category?catid=225RaiVNI8pFDD5L4m807g7ZwmF#!aid/2Tuk4RfvfBC788LlqnQrWiPiEGW
1. 简历
- 本节介绍C++线程管控,包括移交线程的归属权,线程并发数量控制以及获取线程id等基本操作。
2. 线程归属权
- 比如下面,我们说明了线程归属权的转移方式
#include <iostream>
#include <thread>
#include <string>
void some_function()
{
while (true)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void some_other_function()
{
while (true)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
// t1 绑定some_function
std::thread t1(some_function);
// 2 转移t1管理的线程给t2,转移后t1无效
std::thread t2 = std::move(t1);
// 3 t1 可继续绑定其他线程,执行some_other_function
t1 = std::thread(some_other_function);
// 4 创建一个线程变量t3
std::thread t3;
// 5 转移t2管理的线程给t3
t3 = std::move(t2);
// 6 转移t3管理的线程给t1
t1 = std::move(t3);
std::this_thread::sleep_for(std::chrono::seconds(2000));
return 0;
}
3. joining_thread
joinable()
bool joinable() const noexcept;
- 曾经有一份C++17标准的备选提案,主张引入新的类joining_thread,它与std::thread类似,但只要其执行析构函数,线程即能自动汇合,这点与scoped_thread非常像。可惜C++标准委员会未能达成共识,结果C++17标准没有引入这个类,后来它改名为std::jthread,依然进入了C++20标准的议程(现已被正式纳入C++20标准)。除去这些,实际上joining_thread类的代码相对容易编写
#include <iostream>
#include <thread>
#include <string>
class joining_thread
{
std::thread _t;
public:
joining_thread() noexcept = default;
template <typename Callable, typename... Args>
explicit joining_thread(Callable &&func, Args &&...args)
: _t(std::forward<Callable>(func), std::forward<Args>(args)...) {}
explicit joining_thread(std::thread t) noexcept
: _t(std::move(t)) {}
joining_thread(joining_thread &&other) noexcept
: _t(std::move(other._t)) {}
joining_thread &operator=(joining_thread &&other) noexcept
{
// 如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable())
{
join();
}
_t = std::move(other._t);
return *this;
}
joining_thread &operator=(joining_thread other) noexcept
{
// 如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable())
{
join();
}
_t = std::move(other._t);
return *this;
}
~joining_thread() noexcept
{
if (joinable())
{
join();
}
}
void swap(joining_thread &other) noexcept
{
_t.swap(other._t);
}
std::thread::id get_id() const noexcept
{
return _t.get_id();
}
bool joinable() const noexcept
{
return _t.joinable();
}
void join()
{
_t.join();
}
void detach()
{
_t.detach();
}
std::thread &as_thread() noexcept
{
return _t;
}
const std::thread &as_thread() const noexcept
{
return _t;
}
};
- 使用起来比较简单,我们直接构造一个joining_thread对象即可。
4. 容器存储
void use_vector() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i) {
threads.emplace_back(param_function, i);
}
for (auto& entry : threads) {
entry.join();
}
}
5. 选择运行数量
- 借用C++标准库的std::thread::hardware_concurrency()函数,它的返回值是一个指标,表示程序在各次运行中可真正并发的线程数量.我们可以模拟实现一个并行计算的功能,计算容器内所有元素的和
#include <iostream>
#include <thread>
#include <string>
#include <vector>
#include<algorithm>
#include<numeric>
template<typename lterator,typename T>
struct accumulate_block
{
void operator()(lterator first, lterator last, T& result)
{
result += std::accumulate(first, last, result);
}
};
template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init; // ⇽-- - ① 上面的代码1处判断要计算的容器内元素为0个则返回。
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread; // 等价于ceil(length / min_per_thread) 向上取整
// ⇽-- - ② 2处计算最大开辟的线程数,我们预估每个线程计算25个数据长度。
unsigned long const hardware_threads =
std::thread::hardware_concurrency(); // 表示程序在各次运行中可真正并发的线程数量.
/*
但是我们可以通过std::thread::hardware_concurrency返回cpu的核数,我们期待的是开辟的线程数小于等于cpu核数,
这样才不会造成线程过多时间片切换开销。
*/
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); // ⇽-- - ③ 3处计算了适合开辟线程数的最小值。
unsigned long const block_size = length / num_threads; // ⇽-- - ④ 4处计算了步长,根据步长移动迭代器然后开辟线程计算。
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1); // ⇽-- - ⑤ 5处初始化了线程数-1个大小的vector,因为主线程也参与计算,所以这里-1.
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{ // 6处移动步长,7处开辟线程,8处更新起始位置。
Iterator block_end = block_start;
std::advance(block_end, block_size); // ⇽-- - ⑥
// 定义在头文件 <iterator> 中。它的作用是 将一个迭代器向前或向后移动指定的距离。
threads[i] = std::thread( // ⇽-- - ⑦
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; // ⇽-- - ⑧
}
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]); // ⇽-- - ⑨9处为主线程计算。
for (auto& entry : threads)
entry.join(); // ⇽-- - ⑩10 处让所有线程join
return std::accumulate(results.begin(), results.end(), init); // ⇽-- - ⑪ 11 处最后将所有计算结果再次调用std的accumulate算出结果。
}
void use_parallel_acc()
{
std::vector<int> vec(1000000);
for (int i = 0; i < 1000000; i++)
{
vec.push_back(i);
}
int sum = 0;
sum = parallel_accumulate<std::vector<int>::iterator, int>(vec.begin(),
vec.end(), sum);
std::cout << "sum is " << sum << std::endl;
}
int main()
{
use_parallel_acc();
return 0;
}
6. 识别线程
所谓识别线程就是获取线程id,可以根据线程id是否相同判断是否同一个线程。比如我们启动了一个线程,我们可以通过线程变量的get_id()获取线程id。
std::thread t([](){
std::cout << "thread start" << std::endl;
});
t.get_id();
- 但是如果我们想在线程的运行函数中区分线程,或者判断哪些是主线程或者子线程,可以通过这总方式
std::thread t([](){
std::cout << "in thread id " <<
std::this_thread::get_id() << std::endl;
std::cout << "thread start" << std::endl;
});
std::this_thread::get_id()
t.get_id()