1 内存序
2 简介
std::memory_order
是 C++11
引入的一个枚举类型,用于和 <atomic>
原子操作一起使用,控制多线程环境下内存的可见性和执行顺序。
它的主要作用是:告诉编译器和 CPU,在执行某个原子操作时,哪些内存读写可以重排,哪些必须按顺序来.
3 理论
略
4 实践
- 实现一个拆分数据的测试例子,主线程把数据写入内存,设置标识为 false,等待另外 2 个线程处理始数据,处理完了把标识设置为 true.
- 发现在 debug 模式正常拆分,在 RelWithDebInfo 模式拆分错误.
- 猜测是内存序的问题,把标识修改为 std::atomic,加上内存序号操作后正确.
4.1 主线程拆分
4.2 拆分线程
4.3 结果
4.4 解决
// 在读取时,使用memory_order_acquire,表示在此之后的代码不会被编译器重排到前面
if (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {
split(i);
}
// 在设置时,使用memory_order_release,表示在此之前的代码不会被编译器重排到后面
info.is_finished.store(false, std::memory_order::memory_order_release);
5 源码
#include <iostream>
#include <fstream>
#include <vector>
#include <bitset>
#include "xdebug.h"
#include "chan_split.h"
#define BUffER_SIZE 1024 * 1024 * 2
#if 0
int main() {
uint8_t* pdata = new uint8_t[1024 * 1024 * 2];
chan_split chan_split_(0, 1024 * 1024 * 2);
uint8_t* ptmp = pdata;
size_t loop_count = 1024 * 1024 * 2 / 32;
for (size_t i = 0; i < loop_count; i++) {
memset(ptmp, i & 255, 32);
ptmp += 32;
}
std::string filename = "file.bin";
std::ofstream file(filename, std::ios::binary);
if (file) {
file.write((const char*)pdata, 1024 * 1024 * 2);
}
chan_split_.start_split_async();
for (int i = 0; i < 150; i++) {
chan_split_.push_data(pdata, 1024 * 1024 * 2, 2);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
chan_split_.stop_split_async();
xdebug("done.");
return 0;
}
#else
int main() {
chan_split chan_split_(0, BUffER_SIZE);
uint8_t* pdata = new uint8_t[BUffER_SIZE];
std::ifstream file("xdma_test.dat", std::ios::binary);
if (!file) {
xdebug("open file failed.");
return 1;
}
chan_split_.start_split_async();
int count = 0;
while (file.eof() == false) {
file.read((char*)pdata, BUffER_SIZE);
std::streamsize byte_read_len = file.gcount();
chan_split_.push_data(pdata, byte_read_len, 2);
count++;
}
xdebug("count=%d", count);
std::this_thread::sleep_for(std::chrono::seconds(1));
chan_split_.stop_split_async();
delete[] pdata;
return 0;
}
#endif
#pragma once
#include <thread>
#include <shared_mutex>
#include <future>
#include <atomic>
#include "ThreadPool.h"
#define USE_ATIMIC_BOOL_C 1
struct split_info_t {
int chan_index{0}; /* 拆分的索引 */
int dma_index{0}; /* dma索引 */
int chan_count{1}; /* 通道数 */
uint8_t* pdata{nullptr}; /* 待拆分数据 */
size_t pdata_len{0}; /* 待拆分数据长度 */
std::vector<uint8_t> data; /* 处理完成的数据 */
size_t data_len{0}; /* 待拆分数据长度 */
#ifdef USE_ATIMIC_BOOL_C
std::atomic<bool> is_finished; /* 是否拆分完成 */
#else
bool is_finished;
#endif
};
class chan_split {
public:
chan_split(int dma_index, size_t read_dma_len);
~chan_split();
void push_data(uint8_t* pdata, size_t len, int chan_count);
void start_split_async();
void stop_split_async();
private:
void split(int chan_index);
void extract(split_info_t& pinfo);
private:
split_info_t info_arr_[2];
bool is_running_{true};
ThreadPool pool_;
std::vector<std::future<int>> results_;
};
#include "chan_split.h"
#include <string>
#include <fstream>
#include "xdebug.h"
chan_split::chan_split(int dma_index, size_t read_dma_len)
: pool_(2) {
for (int i = 0; i < 2; i++) {
split_info_t& info = info_arr_[i];
info.chan_index = i;
info.dma_index = dma_index;
info.data.resize(read_dma_len);
}
}
chan_split::~chan_split() {
is_running_ = false;
}
void chan_split::start_split_async() {
for (int i = 0; i < 2; i++) {
/* clang-format off */
results_.emplace_back(
pool_.enqueue([this, i] {
while (is_running_) {
#ifdef USE_ATIMIC_BOOL_C
if (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {
#else
if(info_arr_[i].is_finished == false){
#endif
split(i);
}
}
return 0;
})
);
/* clang-format on */
}
}
void chan_split::stop_split_async() {
is_running_ = false;
for (int i = 0; i < 2; i++) {
if (results_[i].valid() == true) {
results_[i].wait();
}
}
results_.clear();
}
void chan_split::push_data(uint8_t* pdata, size_t len, int chan_count) {
for (int i = 0; i < chan_count; i++) {
split_info_t& info = info_arr_[i];
info.chan_count = chan_count;
info.pdata = pdata;
info.pdata_len = len;
#ifdef USE_ATIMIC_BOOL_C
info.is_finished.store(false, std::memory_order::memory_order_release);
#else
info.is_finished = false;
#endif
}
for (int i = 0; i < chan_count; i++) {
#ifdef USE_ATIMIC_BOOL_C
while (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {}
#else
while (info_arr_[i].is_finished == false) {}
#endif
}
}
void chan_split::split(int chan_index) {
split_info_t& info = info_arr_[chan_index];
switch (info.chan_count) {
case 1: {
/* 直接调用外部函数 */
break;
}
default: {
/* 拆分数据 */
extract(info);
break;
}
}
#ifdef USE_ATIMIC_BOOL_C
info.is_finished.store(true, std::memory_order::memory_order_release);
#else
info.is_finished = true;
#endif
}
void chan_split::extract(split_info_t& info) {
int ele_size = 16;
int loop_size = ele_size * info.chan_count;
size_t loop_count = info.pdata_len / ele_size;
uint8_t* src = info.pdata + info.chan_index * ele_size;
uint8_t* dst = info.data.data();
for (int i = 0; i < loop_count; i++) {
memcpy(dst, src, ele_size);
dst += ele_size;
src += loop_size;
i++;
}
info.data_len = info.pdata_len / info.chan_count;
/* 调用回调函数 */
std::string filename = "xdma_test_" + std::to_string(info.chan_index) + ".dat";
std::ofstream file(filename, std::ios::binary | std::ios::app);
if (file) {
file.write((const char*)info.data.data(), info.data_len);
}
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
~ThreadPool();
/* 函数为enqueue(F&& f, Args&&... args)
返回的类型是推导出来的 std::future<typename std::result_of<F(Args...)>::type>
*/
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
private:
// need to keep track of threads so we can join them 需要跟踪线程,便于join等待线程
std::vector<std::thread> workers_;
// the task queue 任务队列
std::queue<std::function<void()>> tasks_;
// synchronization
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads_count)
: stop_(false) {
for (size_t i = 0; i < threads_count; ++i)
workers_.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(lock, [this] { // 先释放锁,再阻塞直到stop或者任务队列非空
return this->stop_ || !this->tasks_.empty();
});
if (this->stop_ && this->tasks_.empty())
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
});
}
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task =
std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// don't allow enqueueing after stopping the pool
if (stop_)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace([task]() {
(*task)();
});
}
condition_.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& worker : workers_)
worker.join();
}
#endif