在上一节中我们实现了同步日志,并输出到stdout。这节我们来实现异步。
在上一节中,添加了AppendFile类,这是对文件操作的一个底层的类,我们需要一个更加上层的,使用更便捷的接口来操控磁盘文件。比如说每输出了1000条日志消息,就需要冲刷AppendFile类中的缓冲区buffer_;还有日志文件大小若达到了预定的大小,就要新开一个日志文件等等。所以我们新创一个类,更加方便我们使用。
1.LogFile类
// 日志文件类,用于管理日志文件的输出
class LogFile
{
public:
// 构造函数,初始化 LogFile 对象
// 参数 file_name: 日志文件名,flushEveryN: 每写入多少条日志消息就冲刷一次缓冲区
LogFile(const std::string& file_name, int flushEveryN)
: filename_(file_name), // 初始化日志文件名
flushEveryN_(flushEveryN), // 设置每 n 条日志后强制冲刷一次
count_(0), // 初始化计数器为 0,记录写入的日志条数
file_(std::make_unique<AppendFile>(file_name)) // 创建 AppendFile 对象并初始化 unique_ptr
{
// 在这里可以添加其他初始化操作
}
// 添加日志消息到 file_ 的缓冲区
// 参数 str: 指向日志消息的字符数组,len: 日志消息的长度
void Append(const char* str, int len)
{
file_->Append(str, len); // 将日志消息添加到 AppendFile 的缓冲区
count_++; // 增加计数器
// 检查计数器是否达到了冲刷的条件
if (count_ >= flushEveryN_) {
count_ = 0; // 达到条件后重置计数器
file_->Flush(); // 冲刷文件的缓冲区,将日志写入磁盘
}
}
// 冲刷 file_ 的缓冲区
void Flush() {
file_->Flush(); // 调用 AppendFile 的 Flush 方法
}
private:
// 日志文件名,存储日志输出的文件路径
const std::string filename_;
// 每写 flushEveryN_ 条日志消息就强制冲刷 file_ 的缓冲区内容到磁盘中
const int flushEveryN_;
// 计数器,记录调用 Append() 的次数
int count_;
// 智能指针,用于管理 AppendFile 对象的生命周期
std::unique_ptr<AppendFile> file_;
};
LogFile类中有个AppendFile的智能指针成员变量file_,LogFile类通过操控file_就可以更加方便实现我们上述的需求。
该构造函数中创建一个AppendFile类对象的智能指针赋给成员变量file_。
在LogFile::Append()中调用AppendFile::Append()函数,并记录Append()的使用次数,(即也是记录添加了多少条日志消息)。达到次数后,就进行fflush()冲刷,即把日志消息写到磁盘文件中。
// LogFile 类构造函数
LogFile::LogFile(const std::string& file_name, int flushEveryN)
: filename_(file_name) // 初始化日志文件名,使用传入的参数 file_name
, flushEveryN_(flushEveryN) // 设置每 n 条日志后需要冲刷的频率
, count_(0) // 初始化记录已写入日志消息数量的计数器为 0
{
// 使用智能指针创建 AppendFile 对象,负责管理对日志文件的操作
file_ = std::make_unique<AppendFile>(filename_); // 将文件名传递给 AppendFile 的构造函数
}
// 添加日志消息到 file_ 的缓冲区
void LogFile::Append(const char* logline, int len)
{
// 将日志消息添加到 AppendFile 的缓冲区
file_->Append(logline, len);
// 增加计数器,记录已写入的日志条数
if (++count_ >= flushEveryN_) {
count_ = 0; // 达到冲刷条件后重置计数器
file_->Flush(); // 强制冲刷缓冲区,将日志写入磁盘文件
}
}
这里可能又有疑惑了:为什么不把这些操作直接放到AppendFile类中呢,为什么就非要创建多一个类呢。
这是为了方便我们的使用,AppendFile类封装了对磁盘文件的操作,LogFile类就只需要操控AppendFile的智能指针file_就行,这样就能很大便利我们去编写代码。
封装起来方便我们编写代码,这也是为了让类的责任分工明确。
或者你也可以去试试把这两个类写成一个类看看是什么效果。
接着来看看重头戏,实现异步的类。
2.AsyncLogger类
// 异步日志类,用于在单独的线程中处理日志输出
class AsyncLogger
{
public:
// 构造函数,参数1为日志文件名,参数2为每隔多少秒冲刷缓冲区,默认为3秒
AsyncLogger(const std::string& fileName, int flushInterval = 3)
: flushInterval_(flushInterval), // 设置冲刷时间间隔
is_running_(false), // 初始化运行状态
fileName_(fileName), // 保存日志文件名
currentBuffer_(std::make_unique<Buffer>()), // 初始化当前缓冲区
nextBuffer_(std::make_unique<Buffer>()) // 初始化下一个缓存区
{
// 这里可以添加其他初始化操作
}
// 析构函数
~AsyncLogger() {
stop(); // 确保在析构时停止线程
}
// 将日志消息添加到缓冲区
void Append(const char* logline, int len);
// 启动异步日志线程
void start() {
is_running_ = true; // 设置运行状态为真
thread_ = std::thread([this]() { ThreadFunc(); }); // 创建并启动日志处理线程
}
// 停止异步日志线程
void stop();
public:
// 后端日志线程函数
void ThreadFunc();
// 定义固定大小的缓冲区
using Buffer = FixedBuffer<KLargeBuffer>;
using BufferPtr = std::unique_ptr<Buffer>; // 缓冲区智能指针
using BufferVector = std::vector<BufferPtr>; // 缓冲区向量类型
// 冲刷时间间隔
const int flushInterval_;
bool is_running_; // 日志线程运行状态
std::string fileName_; // 日志文件名
// 后端线程
std::thread thread_;
// 互斥锁,用于保护共享数据的线程安全
std::mutex mutex_;
std::condition_variable cond_; // 条件变量,用于线程同步
// 当前和下一个缓冲区指针
BufferPtr currentBuffer_;
BufferPtr nextBuffer_;
// 等待写入文件的已填满的缓冲向量
BufferVector buffers_;
};
这里的异步主要是用空间换时间。该类里面的成员变量很多。
muduo中的日志使用的是双缓冲技术,这里讲解的也主要是参考陈硕书籍《Linux多线程服务端编程》。基本思路是先准备好两块buffer:currentBuffer_和nextBuffer_。前端负责往currentBuffer_中填写日志消息,后端负责将nextBuffer_的数据写入文件中(磁盘)。
当currentBuffer_写满后,交换currentBuffer_和nextBuffer_,(交换之后,那么前端就是往nextBuffer_中填写日志消息了),让后端将currentBuffer_的数据写入文件,而前端就往nextBuffer_中填入新的日志消息,如此往复。
先来看看发送方的代码,即是前端代码。
// 将日志消息添加到当前缓冲区
void AsyncLogger::Append(const char* logline, int len)
{
// 使用唯一锁保护当前的共享资源,确保线程安全
std::unique_lock<std::mutex> lock(mutex_);
// 检查当前缓冲区是否有足够的可用空间来添加新的日志消息
if (currentBuffer_->Available() > len) {
// 当前缓冲区有足够的空间,直接将日志消息添加到缓冲区
currentBuffer_->Append(logline, len);
}
else {
// 当前缓冲区没有足够的空间,准备处理已满的缓冲区
// 将已满的当前缓冲区移动到等待写入的缓冲区容器中
buffers_.emplace_back(std::move(currentBuffer_));
// 如果下一个可用的缓冲区不为空,则将其移动给 currentBuffer_
if (nextBuffer_) {
currentBuffer_ = std::move(nextBuffer_);
}
else {
// 否则,创建一个新的缓冲区并赋值给 currentBuffer_
currentBuffer_.reset(new Buffer);
}
// 将日志消息添加到新的当前缓冲区
currentBuffer_->Append(logline, len);
// 通知后端日志线程,可以读取新填充的缓冲区
cond_.notify_one();
}
}
该函数中主要有3种情况,逻辑也清晰,要说的也在注释中了哈。
而cond_.notify_one()就会唤醒后端log线程,把日志消息真正写入到磁盘文件中了。
来看看后端log线程函数。
// 后端日志线程函数,负责从缓冲区读取日志并写入磁盘文件
void AsyncLogger::ThreadFunc()
{
// ======================== 步骤1 ============================
// 打开磁盘日志文件,创建 LogFile 对象
LogFile output(fileName_); // 通过文件名初始化日志文件对象
// 准备好后端备用的缓冲区 1 和 2
auto newBuffer1 = std::make_unique<Buffer>(); // 创建第一个备用缓冲区
auto newBuffer2 = std::make_unique<Buffer>(); // 创建第二个备用缓冲区
// 准备读取的缓冲区向量
BufferVector buffersToWrite;
buffersToWrite.reserve(16); // 预留 16 个元素空间,以提高效率
// 后端线程的无限循环,处理日志的写入
while (is_running_) {
{
// ============================= 步骤2 =========================
std::unique_lock<std::mutex> lock(mutex_); // 锁定 mutex_ 以确保线程安全
// 如果当前没有缓存的日志数据,等待条件变量
if (buffers_.empty()) {
// 对条件变量进行等待,直到超时或有新的日志写入
cond_.wait_for(lock, std::chrono::seconds(flushInterval_));
}
// 将当前缓冲区移动到待写入的缓冲区容器中
buffers_.emplace_back(std::move(currentBuffer_));
// 将后端线程准备好的缓冲区 1 赋值给 currentBuffer_
currentBuffer_ = std::move(newBuffer1);
// 交换 buffers_ 和 buffersToWrite,保证后端线程可以写入
buffersToWrite.swap(buffers_);
// 如果下一个可用的缓冲区为空,将后端线程的缓冲区 2 赋给 nextBuffer_
if (!nextBuffer_) {
nextBuffer_ = std::move(newBuffer2);
}
} // 离开锁的作用域, mutex_ 解锁
// ======================== 步骤3 =======================
// 将缓冲区内容写入日志文件
for (size_t i = 0; i < buffersToWrite.size(); ++i) {
// 将已填满的缓冲区中的数据写入到磁盘文件
output.Append(buffersToWrite[i]->GetData(), buffersToWrite[i]->GetLength());
}
// ========================= 步骤4 =====================
// 如果待写入的缓冲区数量过多,丢弃多余的缓冲区
if (buffersToWrite.size() > 2) {
buffersToWrite.resize(2); // 仅保留最近的两个缓冲区
}
// 恢复后端备用缓冲区
if (!newBuffer1) { // 检查 newBuffer1 是否为空
newBuffer1 = buffersToWrite.back(); // 从待写入的缓冲区中获取一个
buffersToWrite.pop_back(); // 从向量中移除它
newBuffer1->Reset(); // 将缓冲区的数据指针归零,准备重新使用
}
// newBuffer2 的操作与 newBuffer1 相似,此处省略具体代码
// 丢弃无用的已写缓冲区
buffersToWrite.clear(); // 清除当前的待写入缓冲区
output.Flush(); // 冲刷输出,确保数据写入磁盘
}
output.Flush(); // 在线程结束前最后一次冲刷,确保所有数据都写入
}
步骤1:
先打开保存日志的磁盘文件,准备好后端备用的空闲缓冲区1、2,用来交换已写好日志消息的缓冲区。也备好读取的缓冲区vector,用来交换已写入文件的填完完毕的缓冲vector。
这里都是为了在临界区内交换而不用阻塞的。
步骤2:到了while(1)循环里面了。也到了需要加锁的临界区。
首先等待条件是否触发,等待触发的条件有两个:其一是超时(超过默认的3s),其二是前端写满了一个或多个buffer。注意这里使用的是if(),没有使用while()循环的。
当条件满足时,将当前缓冲(currentBuffer_)移入buffers_,并立刻将空闲的newBuffer1移为当前缓冲。要注意的是,这整段代码是位于临界区内的,所以不会有任何race condition。
接下来将已填写完毕的缓冲vector(buffers_)与备好的缓冲区buffersToWrite交换(和第12节中的函数doPendingFunctors()中也是要进行交换后,才好执行任务回调函数的),后面的代码就可以在临界区外安全地访问buffersToWrite,将其中的日志数据写入到磁盘文件中。
接着用newBuffer2替换nextBuffer_,这样前端就始终有一个预备buffer 可以调配。
nextBuffer_可以减少前端临界区分配内存的概率,缩短前端临界区长度。(因为前端Append()函数中有一种情况会currentBuffer_.reset(new Buffer);)
步骤3:这时已经出了临界区
若缓冲区过多,说明前端产生log的速度远大于后端消费的速度,这里只是简单的将它们丢弃(调用erase()函数)。之后把日志消息写入到磁盘(调用LogFile::Append函数),
步骤4:
调用vector的resize()函数重置该缓冲区vector大小,并将该缓冲区vector内的buffer重新填充给newBuffer1和newBuffer2,这样下一次执行的时候就还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
3.如何使用AsyncLogger类
在AsyncLogger类中我们使用的是LogFile类,不使用AppendFile类。AppendFile类是供LogFile对象使用的,我们基本不会直接去操作AppendFile类。
记住是通过LogFile类去调用AppendFile类。
那关键的异步日志的AsyncLogger类已实现。那该怎么使用该类呢。
对外界可见,可以直接使用的是Logger类,那怎么把这两个类关联起来呢。
AsyncLogger类中是使用AsyncLogger::start()开始后端线程,前端收集日志消息是AsyncLogger::Append()函数。我们就要从这两个函数下手。
在Logger类中添加
public:
// 获取日志文件名,返回一个 std::string 类型
static std::string LogFileName() { return logFileName_; }
// 定义输出回调函数类型,函数签名为接受一个字符指针和长度
using OutputFunc = void (*)(const char* msg, int len);
// 定义冲刷回调函数类型,无参数返回类型
using FlushFunc = void (*)();
private:
// 静态成员变量,存储日志文件的名字
static std::string logFileName_;
前一节我们输出到stdout是通过Logger类的析构函数来输出的。
//这是上一节的实现
Logger::~Logger()
{
//其他的省略
DefaultOutput(buf.Data(), buf.Length()); //是调用了fwrite()输出到stdout的
}
那我们可以修改析构函数中的DefaultOutput()函数就行,我们可以调用AsyncLogger::Append()函数来让消息日志写到磁盘中。AsyncLogger::Append()就是收集消息日志,写到缓冲中,等到缓存满了,条件符合了,就会notify_one(),唤醒后端log线程,把日志消息真正写到磁盘中去。
再来看看AsyncLogger类对象在哪生成呢。
// 声明一个静态唯一指针,指向 AsyncLogger 实例
static std::unique_ptr<AsyncLogger> asyncLogger;
// 声明一个静态 once_flag,用于确保某个操作只执行一次
static std::once_flag g_once_flag;
// 静态成员变量,用于存储日志文件名
std::string Logger::logFileName_ = "../li22.log";
// 用于初始化 AsyncLogger 的函数
void OnceInit()
{
// 创建 AsyncLogger 的唯一指针并启动后端线程
asyncLogger = std::make_unique<AsyncLogger>(Logger::LogFileName());
asyncLogger->start(); // 开启后端日志处理线程
}
// 异步输出日志消息的函数
void AsyncOutput(const char* logline, int len)
{
// 确保 OnceInit 只执行一次,防止多线程环境下的竞态条件
std::call_once(g_once_flag, OnceInit); // 仅初始化一次
// 将日志消息添加到 AsyncLogger 缓冲区
asyncLogger->Append(logline, len);
}
// 全局变量:输出函数,指向 AsyncOutput 函数
Logger::OutputFunc g_output = AsyncOutput;
// Logger 的析构函数
Logger::~Logger()
{
// 省略其他清理代码...
// 调用全局输出函数,将缓冲区内容写入
g_output(buf.Data(), buf.Length()); // 输出当前缓冲区的数据
}
logFileName_ 就是我们的日志文件名。
那我们从该析构函数Logger::~Logger()开始,g_output()是全局输出函数,这里设置成了AsyncOutput()函数,AsyncOutput()函数使用了std::call_once,这是c++11新增的,可以保证不管是开多少个线程,该函数(OnceInit())只会执行一次。这就可以保证了后端log线程只有一个,这就不会有抢占写日志到磁盘的情况发生。asyncLogger的后端log线程就开始了,那就接着调用AsyncLogger::Append()函数就可以了。下面是其主要的流程图。
异步日志的主体就完成了,还有一些功能,小细节没有实现,下一节再讲解了。
这个双缓冲的设计真不错。
这一节我们就可以把日志输出到磁盘文件了。