并发操作的同步
1. 等待事件或等待其他条件
std::condition_variable:是 C++ 标准库提供的用于线程间同步的工具,它允许一个或多个线程等待某个条件满足后再继续执行。通常与 std::mutex 一起使用,std::mutex 用于保护共享数据,std::condition_variable 用于线程间的通知机制。
虚假唤醒 :指线程在没有收到明确通知的情况下从 wait 状态中被唤醒。为避免虚假唤醒,wait 函数通常会接受一个谓词(可调用对象)作为参数,线程被唤醒后会检查该谓词的返回值,如果为 false 则会继续等待。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void worker() {
std::unique_lock<std::mutex> lock(mtx);
// 等待 ready 变为 true
cv.wait(lock, [] { return ready; });
std::cout << "Worker thread is working." << std::endl;
}
void prepare_work() {
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
// 通知一个等待的线程
cv.notify_one();
std::cout << "Work is prepared." << std::endl;
}
int main() {
std::thread t1(worker);
std::thread t2(prepare_work);
t1.join();
t2.join();
return 0;
}
/*
1. worker 线程首先获取 std::unique_lock 锁定互斥量 mtx,然后调用 cv.wait(lock, [] { return ready; })。wait 函数会检查 ready 条件,如果不满足,它会释放锁并使线程进入等待状态。
2. prepare_work 线程获取锁,将 ready 置为 true,释放锁后调用 cv.notify_one() 通知一个等待的线程。
3. 当 worker 线程被通知后,它会重新获取锁并继续执行。
*/
2. 基于条件变量的生产者 - 消费者模型
- 核心概念
生产者 - 消费者模型是一种常见的并发编程模式,其中生产者线程负责生成数据并将其放入共享缓冲区,消费者线程负责从缓冲区中取出数据并进行处理。使用条件变量可以实现生产者和消费者之间的同步,避免数据竞争和缓冲区溢出。 - 代码
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable data_cond;
void producer() {
for (int i = 0; i < 5; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
}
// 通知消费者有新数据
data_cond.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 等待队列中有数据
data_cond.wait(lock, [] { return!data_queue.empty(); });
int value = data_queue.front();
data_queue.pop();
lock.unlock();
std::cout << "Consumed: " << value << std::endl;
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
// 这里可以添加适当的机制来停止消费者线程
// 例如使用一个标志位
return 0;
}
/*
1. producer 线程每隔 1 秒向队列中添加一个数据,并调用 data_cond.notify_one() 通知消费者线程。
2. consumer 线程在 data_cond.wait(lock, [] { return!data_queue.empty(); }) 处等待,直到队列中有数据。当有数据时,它会取出数据并处理。
*/
3. 超时等待
- 核心概念
在某些情况下,线程不能无限期地等待某个条件满足,需要设置一个超时时间。std::condition_variable 提供了 wait_for 和 wait_until 方法来实现超时等待。 - 代码
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void worker() {
std::unique_lock<std::mutex> lock(mtx);
// 等待 2 秒
auto timeout = std::chrono::steady_clock::now() + std::chrono::seconds(2);
if (cv.wait_until(lock, timeout, [] { return ready; })) {
std::cout << "Worker thread is working after being notified." << std::endl;
} else {
std::cout << "Worker thread timed out." << std::endl;
}
}
void prepare_work() {
std::this_thread::sleep_for(std::chrono::seconds(3));
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one();
std::cout << "Work is prepared." << std::endl;
}
int main() {
std::thread t1(worker);
std::thread t2(prepare_work);
t1.join();
t2.join();
return 0;
}
/*
- worker 线程使用 cv.wait_until(lock, timeout, [] { return ready; }) 等待 ready 条件满足,最多等待 2 秒。
- prepare_work 线程在 3 秒后才将 ready 置为 true 并通知 worker 线程,因此 worker 线程会超时。
*/
4. std::future 和 std::promise
核心概念
std::future:表示一个异步操作的结果,它可以用来获取异步任务的返回值。std::future 对象可以通过 get() 方法来阻塞等待异步操作完成并获取结果。
std::promise:是一个可以存储值或异常的对象,它可以与 std::future 关联,用于在不同线程之间传递结果。std::promise 对象可以通过 set_value() 方法设置结果,或通过 set_exception() 方法设置异常。
代码
#include <iostream>
#include <thread>
#include <future>
int calculate_sum(int a, int b) {
return a + b;
}
int main() {
// 创建一个 std::promise 对象
std::promise<int> prom;
// 获取与 promise 关联的 future
std::future<int> fut = prom.get_future();
std::thread t([&prom]() {
int result = calculate_sum(3, 5);
// 设置 promise 的值
prom.set_value(result);
});
// 获取 future 的值
int sum = fut.get();
std::cout << "The sum is: " << sum << std::endl;
t.join();
return 0;
}
/*
- 创建 std::promise<int> prom 和与之关联的 std::future<int> fut。
- 启动一个新线程,在该线程中调用 calculate_sum 函数计算结果,并使用 prom.set_value(result) 设置 promise 的值。
- 在主线程中,使用 fut.get() 获取异步操作的结果。
*/
5. std::packaged_task 和 std::async
核心概念
- std::packaged_task: 包装一个可调用对象(如函数、函数对象或 lambda 表达式),并将其结果与 std::future 关联。可以通过调用 std::packaged_task 对象来执行包装的可调用对象,并通过关联的 std::future 获取结果。
- std::async:是一个更高级的异步操作启动方式,它可以自动管理线程和返回结果。std::async 接受一个可调用对象和其参数,返回一个 std::future 对象,用于获取异步操作的结果。
- 示例代码:
#include <iostream>
#include <thread>
#include <future>
int add(int a, int b) {
return a + b;
}
int main() {
// 创建一个 packaged_task 对象
std::packaged_task<int(int, int)> task(add);
// 获取与 packaged_task 关联的 future
std::future<int> fut = task.get_future();
std::thread t(std::move(task), 3, 5);
// 获取 future 的值
int result = fut.get();
std::cout << "The result is: " << result << std::endl;
t.join();
return 0;
}
/*
- 创建 std::packaged_task<int(int, int)> task(add) 包装 add 函数,并获取与之关联的 std::future<int> fut。
- 启动一个新线程,将 task 移动到线程中执行,并传递参数 3 和 5。
- 在主线程中,使用 fut.get() 获取异步操作的结果。
*/
6. 多选题目
关于 std::condition_variable,以下说法正确的是( )
A. std::condition_variable 必须与 std::mutex 一起使用
B. 调用 wait 方法时,如果不使用谓词,可能会发生虚假唤醒
C. notify_one 方法会唤醒所有等待的线程
D. wait_for 和 wait_until 方法可以实现超时等待在生产者 - 消费者模型中,以下哪些做法可以提高性能和安全性( )
A. 生产者和消费者使用同一个互斥量保护共享队列
B. 生产者在添加数据后及时通知消费者
C. 消费者在处理数据时长时间持有锁
D. 使用谓词来避免消费者的虚假唤醒关于 std::future 和 std::promise,以下说法正确的是( )
A. std::future 可以用来获取异步操作的结果
B. std::promise 可以存储值或异常
C. 一个 std::promise 只能与一个 std::future 关联
D. std::future 的 get 方法可以多次调用以下哪些情况可能导致线程的虚假唤醒( )
A. 操作系统的调度策略
B. 硬件中断
C. 信号处理
D. 调用 notify_one 或 notify_all 方法在使用 std::condition_variable 时,以下做法正确的是( )
A. 在调用 wait 方法前先锁定互斥量
B. 在调用 notify_one 或 notify_all 方法时持有互斥量
C. 使用谓词来避免虚假唤醒
D. 在 wait 方法返回后再次检查条件
6. 多选题目答案
- 答案:ABD
解释:std::condition_variable 依赖 std::mutex 来保护共享数据,所以必须与 std::mutex 一起使用,A 正确;不使用谓词时,wait 方法可能会因操作系统或硬件的原因发生虚假唤醒,B 正确;notify_one 方法只会唤醒一个等待的线程,notify_all 才会唤醒所有等待的线程,C 错误;wait_for 和 wait_until 方法可以让线程在等待一定时间后自动返回,实现超时等待,D 正确。 - 答案:ABD
解释:生产者和消费者使用同一个互斥量保护共享队列可以避免数据竞争,保证线程安全,A 正确;生产者在添加数据后及时通知消费者可以减少消费者的等待时间,提高性能,B 正确;消费者在处理数据时长时间持有锁会导致其他线程无法访问共享队列,降低并发性能,C 错误;使用谓词来避免消费者的虚假唤醒可以确保消费者只在队列中有数据时才进行处理,提高程序的正确性和性能,D 正确。 - 答案:ABC
解释:std::future 主要用于获取异步操作的结果,A 正确;std::promise 可以存储一个值或异常,并通过关联的 std::future 传递给其他线程,B 正确;一个 std::promise 只能与一个 std::future 关联,C 正确;std::future 的 get 方法只能调用一次,多次调用会导致未定义行为,D 错误。 - 答案:ABC
解释:操作系统的调度策略、硬件中断和信号处理等因素都可能导致线程的虚假唤醒,A、B、C 正确;调用 notify_one 或 notify_all 方法是正常的通知机制,不会导致虚假唤醒,D 错误。 - 答案:ACD
解释:在调用 wait 方法前先锁定互斥量是必要的,因为 wait 方法会在等待时自动释放锁,在被唤醒后重新获取锁,A 正确;调用 notify_one 或 notify_all 方法时不需要持有互斥量,B 错误;使用谓词可以避免虚假唤醒,确保线程只在条件满足时才继续执行,C 正确;在 wait 方法返回后再次检查条件可以进一步确保程序的正确性,D 正确。
7. 设计题目
7.1 设计一个线程安全的缓存系统
要求:
- 多个线程可查询键是否存在,若不存在则触发计算任务
- 约束:同一键同时只允许一个计算任务
7.2 实现可取消的并行任务
要求:
- 任务提交后返回future,允许调用方取消正在执行的任务
- 需要考虑资源清理和线程终止安全
7.3 构建生产者-消费者管道
要求:
- 多个生产者和消费者通过有界缓冲区通信
- 消费者需要优先处理特定类型的消息
7.4 设计超时安全的数据库连接池
要求:
- 线程获取连接时最多等待500ms,超时返回错误
- 连接空闲超过2分钟自动回收
7.5 实现异步日志系统
要求:
- 所有日志写入请求异步执行
- 当日志队列积压超过1000条时切换为同步模式
- 支持刷盘操作等待所有待写日志完成
7.1 参考答案
#include <iostream>
#include <mutex>
#include <future>
#include <unordered_map>
template<typename Key, typename Value>
class SafeCache {
std::mutex mtx;
std::unordered_map<Key, Value> cache;
std::unordered_map<Key, std::shared_future<Value>> pending;
Value compute(const Key& key) {
// 模拟耗时计算
std::this_thread::sleep_for(std::chrono::seconds(1));
return key + "_value";
}
public:
Value get(const Key& key) {
std::unique_lock lock(mtx);
if (auto it = cache.find(key); it != cache.end()) {
return it->second;
}
if (auto it = pending.find(key); it != pending.end()) {
auto fut = it->second;
lock.unlock();
return fut.get();
}
auto promise = std::make_shared<std::promise<Value>>();
auto fut = promise->get_future().share();
pending.emplace(key, fut);
lock.unlock();
try {
auto res = compute(key);
lock.lock();
cache.emplace(key, res);
pending.erase(key);
lock.unlock();
promise->set_value(res);
} catch (...) {
promise->set_exception(std::current_exception());
}
return fut.get();
}
};
// 测试用例
void test_cache() {
SafeCache<std::string, std::string> cache;
auto task = [&](std::string key) {
return cache.get(key);
};
auto fut1 = std::async(std::launch::async, task, "A");
auto fut2 = std::async(std::launch::async, task, "A");
std::cout << fut1.get() << std::endl; // A_value
std::cout << fut2.get() << std::endl; // A_value (仅一次计算)
}
int main() {
test_cache();
return 0;
}
/*
分析:
使用std:mutex保护缓存的访问,防止数据竟争。
当键不存在时,为避免多个线程同时触发计算,使用互斥锁和条件变量来保证单个任务执行。可能需要存储计算中的键,防止重复计算。可以使用std::unordered_map存储键和对应的future,表示计算中的结果:
步骤:
1.定义缓存数据结构,包括互斥锁、存储键值对以及正在计算的键及其future。
2.查询缓存时锁住互斥锁,若存在则直接返回值。
3.如果键不存在且未被计算,则生成一个promise和future,存储到正在计算的map中,并启动异步任务计算结果
4.其他线程查询同一键时,等待该键对应的future就绪。
测试用例:
多线程同时请求相同的键,确保只有一个计算任务执行
验证缓存结果是否正确,避免重复计算。
处理计算中的异常情况。
*/
7.2 参考答案
#include <iostream>
#include <future>
#include <atomic>
#include <chrono>
#include <functional>
// 自定义可取消的任务包装类
template <typename Result>
class CancellableTask {
public:
// 构造函数,接受一个可调用对象和一个取消标志
template <typename Func>
CancellableTask(Func func, std::shared_ptr<std::atomic<bool>> cancelFlag)
: cancelFlag_(cancelFlag) {
// 使用 std::async 异步执行任务
future_ = std::async(std::launch::async, [func, cancelFlag]() {
while (!cancelFlag->load()) {
// 执行任务函数
Result result = func();
return result;
}
// 如果任务被取消,抛出异常
throw std::runtime_error("Task cancelled");
});
}
// 获取任务的 future 对象
std::future<Result> get_future() {
return std::move(future_);
}
// 取消任务
void cancel() {
cancelFlag_->store(true);
}
private:
std::future<Result> future_;
std::shared_ptr<std::atomic<bool>> cancelFlag_;
};
// 示例任务函数
int exampleTask() {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task is working: " << i << std::endl;
}
return 42;
}
int main() {
// 创建一个共享的原子布尔变量作为取消标志
auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
// 创建可取消的任务
CancellableTask<int> task(exampleTask, cancelFlag);
// 获取任务的 future 对象
auto future = task.get_future();
// 模拟一段时间后取消任务
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Cancelling the task..." << std::endl;
task.cancel();
try {
// 尝试获取任务的结果
int result = future.get();
std::cout << "Task result: " << result << std::endl;
} catch (const std::runtime_error& e) {
std::cout << "Exception caught: " << e.what() << std::endl;
}
return 0;
}
7.3 参考答案
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <chrono>
// 定义消息类型
const int PRIORITY_MESSAGE_TYPE = 1;
// 有界缓冲区类
template<typename T>
class BoundedBuffer {
private:
std::vector<T> buffer;
std::mutex mtx;
std::condition_variable not_full;
std::condition_variable not_empty;
size_t capacity;
public:
BoundedBuffer(size_t cap) : capacity(cap) {}
// 生产者向缓冲区添加消息
void enqueue(const T& item) {
std::unique_lock<std::mutex> lock(mtx);
// 等待缓冲区有空间
not_full.wait(lock, [this] { return buffer.size() < capacity; });
buffer.push_back(item);
// 通知消费者缓冲区有新消息
not_empty.notify_one();
}
// 消费者从缓冲区取出消息,优先处理特定类型的消息
T dequeue() {
std::unique_lock<std::mutex> lock(mtx);
// 等待缓冲区有消息
not_empty.wait(lock, [this] { return!buffer.empty(); });
// 优先查找特定类型的消息
auto it = std::find_if(buffer.begin(), buffer.end(), [](const T& msg) {
return std::get<1>(msg) == PRIORITY_MESSAGE_TYPE;
});
T item;
if (it != buffer.end()) {
item = *it;
buffer.erase(it);
} else {
item = buffer.front();
buffer.erase(buffer.begin());
}
// 通知生产者缓冲区有空间
not_full.notify_one();
return item;
}
};
// 生产者函数
void producer(BoundedBuffer<std::pair<int, int>>& buffer, int id) {
for (int i = 0; i < 5; ++i) {
int messageType = (i % 2 == 0)? PRIORITY_MESSAGE_TYPE : 2;
buffer.enqueue({id * 10 + i, messageType});
std::cout << "Producer " << id << " produced message " << id * 10 + i << " of type " << messageType << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 消费者函数
void consumer(BoundedBuffer<std::pair<int, int>>& buffer, int id) {
for (int i = 0; i < 5; ++i) {
auto [messageId, messageType] = buffer.dequeue();
std::cout << "Consumer " << id << " consumed message " << messageId << " of type " << messageType << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int main() {
const int bufferCapacity = 10;
BoundedBuffer<std::pair<int, int>> buffer(bufferCapacity);
const int numProducers = 3;
const int numConsumers = 2;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
// 创建生产者线程
for (int i = 0; i < numProducers; ++i) {
producers.emplace_back(producer, std::ref(buffer), i);
}
// 创建消费者线程
for (int i = 0; i < numConsumers; ++i) {
consumers.emplace_back(consumer, std::ref(buffer), i);
}
// 等待生产者线程结束
for (auto& p : producers) {
p.join();
}
// 等待消费者线程结束
for (auto& c : consumers) {
c.join();
}
return 0;
}
7.4 参考答案
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <vector>
// 模拟数据库连接类
class DatabaseConnection {
public:
DatabaseConnection() {
std::cout << "Database connection created." << std::endl;
}
~DatabaseConnection() {
std::cout << "Database connection destroyed." << std::endl;
}
void executeQuery(const std::string& query) {
std::cout << "Executing query: " << query << std::endl;
}
};
// 数据库连接池类
class DatabaseConnectionPool {
private:
std::queue<DatabaseConnection*> connections;
std::mutex mtx;
std::condition_variable cv;
size_t maxConnections;
std::chrono::time_point<std::chrono::steady_clock> lastUsed;
// 自动回收线程函数
void autoReclaim() {
while (true) {
std::this_thread::sleep_for(std::chrono::minutes(1)); // 每分钟检查一次
{
std::unique_lock<std::mutex> lock(mtx);
auto now = std::chrono::steady_clock::now();
while (!connections.empty()) {
auto connection = connections.front();
auto idleTime = std::chrono::duration_cast<std::chrono::minutes>(now - lastUsed);
if (idleTime.count() >= 2) {
connections.pop();
delete connection;
} else {
break;
}
}
}
}
}
public:
DatabaseConnectionPool(size_t maxConns) : maxConnections(maxConns) {
for (size_t i = 0; i < maxConns; ++i) {
connections.push(new DatabaseConnection());
}
lastUsed = std::chrono::steady_clock::now();
std::thread(&DatabaseConnectionPool::autoReclaim, this).detach();
}
~DatabaseConnectionPool() {
std::unique_lock<std::mutex> lock(mtx);
while (!connections.empty()) {
delete connections.front();
connections.pop();
}
}
// 获取数据库连接
DatabaseConnection* getConnection() {
std::unique_lock<std::mutex> lock(mtx);
if (cv.wait_for(lock, std::chrono::milliseconds(500), [this] { return!connections.empty(); })) {
auto connection = connections.front();
connections.pop();
lastUsed = std::chrono::steady_clock::now();
return connection;
}
return nullptr; // 超时返回 nullptr
}
// 释放数据库连接
void releaseConnection(DatabaseConnection* connection) {
std::unique_lock<std::mutex> lock(mtx);
connections.push(connection);
lastUsed = std::chrono::steady_clock::now();
cv.notify_one();
}
};
// 测试用例
void testConnectionPool() {
DatabaseConnectionPool pool(3);
auto task = [&pool]() {
DatabaseConnection* conn = pool.getConnection();
if (conn) {
conn->executeQuery("SELECT * FROM users");
pool.releaseConnection(conn);
} else {
std::cout << "Failed to get a connection (timeout)." << std::endl;
}
};
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(task);
}
for (auto& t : threads) {
t.join();
}
}
int main() {
testConnectionPool();
return 0;
}
7.5 参考答案
#include <iostream>
#include <fstream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>
#include <chrono>
#include <atomic>
#include <vector>
// 定义日志级别
enum class LogLevel {
DEBUG,
INFO,
WARN,
ERROR
};
class AsyncLogger {
private:
std::queue<std::string> logQueue;
std::mutex mtx;
std::condition_variable cv;
std::ofstream logFile;
std::thread loggerThread;
std::atomic<bool> stopLogging;
std::atomic<bool> isSyncMode;
static constexpr size_t MAX_QUEUE_SIZE = 1000;
// 日志线程函数
void logger() {
std::vector<std::string> buffer;
while (!stopLogging) {
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait_for(lock, std::chrono::milliseconds(100), [this] { return!logQueue.empty() || stopLogging; });
while (!logQueue.empty()) {
buffer.push_back(std::move(logQueue.front()));
logQueue.pop();
}
}
for (const auto& msg : buffer) {
try {
logFile << msg << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error writing log: " << e.what() << std::endl;
}
}
buffer.clear();
}
// 处理剩余的日志
{
std::unique_lock<std::mutex> lock(mtx);
while (!logQueue.empty()) {
try {
logFile << logQueue.front() << std::endl;
logQueue.pop();
} catch (const std::exception& e) {
std::cerr << "Error writing remaining log: " << e.what() << std::endl;
}
}
}
try {
logFile.flush();
} catch (const std::exception& e) {
std::cerr << "Error flushing log file: " << e.what() << std::endl;
}
}
// 格式化日志消息,添加日志级别和时间戳
std::string formatLogMessage(LogLevel level, const std::string& message) {
auto now = std::chrono::system_clock::now();
std::time_t currentTime = std::chrono::system_clock::to_time_t(now);
std::string timeStr = std::ctime(¤tTime);
timeStr.pop_back(); // 去掉换行符
std::string levelStr;
switch (level) {
case LogLevel::DEBUG:
levelStr = "DEBUG";
break;
case LogLevel::INFO:
levelStr = "INFO";
break;
case LogLevel::WARN:
levelStr = "WARN";
break;
case LogLevel::ERROR:
levelStr = "ERROR";
break;
}
return "[" + timeStr + "] [" + levelStr + "] " + message;
}
public:
AsyncLogger(const std::string& logFileName) : stopLogging(false), isSyncMode(false) {
try {
logFile.open(logFileName, std::ios::app);
if (!logFile.is_open()) {
std::cerr << "Failed to open log file: " << logFileName << std::endl;
}
} catch (const std::exception& e) {
std::cerr << "Error opening log file: " << e.what() << std::endl;
}
loggerThread = std::thread(&AsyncLogger::logger, this);
}
~AsyncLogger() {
stopLogging = true;
cv.notify_one();
if (loggerThread.joinable()) {
loggerThread.join();
}
try {
logFile.close();
} catch (const std::exception& e) {
std::cerr << "Error closing log file: " << e.what() << std::endl;
}
}
// 写入日志
void log(LogLevel level, const std::string& message) {
std::string formattedMessage = formatLogMessage(level, message);
{
std::unique_lock<std::mutex> lock(mtx);
if (logQueue.size() >= MAX_QUEUE_SIZE) {
isSyncMode = true;
} else {
isSyncMode = false;
}
}
if (isSyncMode) {
// 同步模式,直接写入日志
std::lock_guard<std::mutex> lock(mtx);
try {
logFile << formattedMessage << std::endl;
logFile.flush();
} catch (const std::exception& e) {
std::cerr << "Error writing log in sync mode: " << e.what() << std::endl;
}
} else {
// 异步模式,将日志消息放入队列
{
std::lock_guard<std::mutex> lock(mtx);
logQueue.push(formattedMessage);
}
cv.notify_one();
}
}
// 刷盘操作
void flush() {
std::unique_lock<std::mutex> lock(mtx);
while (!logQueue.empty()) {
cv.wait(lock, [this] { return logQueue.empty(); });
}
try {
logFile.flush();
} catch (const std::exception& e) {
std::cerr << "Error flushing log during flush operation: " << e.what() << std::endl;
}
}
};
// 测试用例
void testAsyncLogger() {
AsyncLogger logger("test.log");
// 模拟大量日志写入
for (int i = 0; i < 2000; ++i) {
logger.log(LogLevel::INFO, "Log message " + std::to_string(i));
std::cout << "Log message " << i << std::endl; // 输出到控制台
}
// 刷盘操作
logger.flush();
}
int main() {
testAsyncLogger();
return 0;
}