18.实现异步日志:日志输出到磁盘文件

发布于:2025-04-06 ⋅ 阅读:(22) ⋅ 点赞:(0)

在上一节中我们实现了同步日志,并输出到stdout。这节我们来实现异步。

在上一节中,添加了AppendFile类,这是对文件操作的一个底层的类,我们需要一个更加上层的,使用更便捷的接口来操控磁盘文件。比如说每输出了1000条日志消息,就需要冲刷AppendFile类中的缓冲区buffer_;还有日志文件大小若达到了预定的大小,就要新开一个日志文件等等。所以我们新创一个类,更加方便我们使用。

1.LogFile类

// 日志文件类,用于管理日志文件的输出  
class LogFile  
{  
public:  
    // 构造函数,初始化 LogFile 对象  
    // 参数 file_name: 日志文件名,flushEveryN: 每写入多少条日志消息就冲刷一次缓冲区  
    LogFile(const std::string& file_name, int flushEveryN)  
        : filename_(file_name), // 初始化日志文件名  
          flushEveryN_(flushEveryN), // 设置每 n 条日志后强制冲刷一次  
          count_(0), // 初始化计数器为 0,记录写入的日志条数  
          file_(std::make_unique<AppendFile>(file_name)) // 创建 AppendFile 对象并初始化 unique_ptr  
    {  
        // 在这里可以添加其他初始化操作  
    }  

    // 添加日志消息到 file_ 的缓冲区  
    // 参数 str: 指向日志消息的字符数组,len: 日志消息的长度  
    void Append(const char* str, int len)  
    {  
        file_->Append(str, len); // 将日志消息添加到 AppendFile 的缓冲区  
        count_++; // 增加计数器  
        
        // 检查计数器是否达到了冲刷的条件  
        if (count_ >= flushEveryN_) {  
            count_ = 0; // 达到条件后重置计数器  
            file_->Flush(); // 冲刷文件的缓冲区,将日志写入磁盘  
        }  
    }  

    // 冲刷 file_ 的缓冲区  
    void Flush() {   
        file_->Flush(); // 调用 AppendFile 的 Flush 方法  
    }  

private:  
    // 日志文件名,存储日志输出的文件路径  
    const std::string filename_;  

    // 每写 flushEveryN_ 条日志消息就强制冲刷 file_ 的缓冲区内容到磁盘中  
    const int flushEveryN_;   

    // 计数器,记录调用 Append() 的次数  
    int count_;   

    // 智能指针,用于管理 AppendFile 对象的生命周期  
    std::unique_ptr<AppendFile> file_;   
};  

LogFile类中有个AppendFile的智能指针成员变量file_,LogFile类通过操控file_就可以更加方便实现我们上述的需求。

该构造函数中创建一个AppendFile类对象的智能指针赋给成员变量file_。

在LogFile::Append()中调用AppendFile::Append()函数,并记录Append()的使用次数,(即也是记录添加了多少条日志消息)。达到次数后,就进行fflush()冲刷,即把日志消息写到磁盘文件中。

// LogFile 类构造函数  
LogFile::LogFile(const std::string& file_name, int flushEveryN)  
    : filename_(file_name) // 初始化日志文件名,使用传入的参数 file_name  
    , flushEveryN_(flushEveryN) // 设置每 n 条日志后需要冲刷的频率  
    , count_(0) // 初始化记录已写入日志消息数量的计数器为 0  
{  
    // 使用智能指针创建 AppendFile 对象,负责管理对日志文件的操作  
    file_ = std::make_unique<AppendFile>(filename_); // 将文件名传递给 AppendFile 的构造函数  
}  

// 添加日志消息到 file_ 的缓冲区  
void LogFile::Append(const char* logline, int len)  
{  
    // 将日志消息添加到 AppendFile 的缓冲区  
    file_->Append(logline, len);  

    // 增加计数器,记录已写入的日志条数  
    if (++count_ >= flushEveryN_) {  
        count_ = 0; // 达到冲刷条件后重置计数器  
        file_->Flush(); // 强制冲刷缓冲区,将日志写入磁盘文件  
    }  
}  

这里可能又有疑惑了:为什么不把这些操作直接放到AppendFile类中呢,为什么就非要创建多一个类呢

这是为了方便我们的使用,AppendFile类封装了对磁盘文件的操作,LogFile类就只需要操控AppendFile的智能指针file_就行,这样就能很大便利我们去编写代码。

封装起来方便我们编写代码,这也是为了让类的责任分工明确。

或者你也可以去试试把这两个类写成一个类看看是什么效果。

接着来看看重头戏,实现异步的类。

2.AsyncLogger类

// 异步日志类,用于在单独的线程中处理日志输出  
class AsyncLogger  
{  
public:  
    // 构造函数,参数1为日志文件名,参数2为每隔多少秒冲刷缓冲区,默认为3秒  
    AsyncLogger(const std::string& fileName, int flushInterval = 3)  
        : flushInterval_(flushInterval), // 设置冲刷时间间隔  
          is_running_(false), // 初始化运行状态  
          fileName_(fileName), // 保存日志文件名  
          currentBuffer_(std::make_unique<Buffer>()), // 初始化当前缓冲区  
          nextBuffer_(std::make_unique<Buffer>()) // 初始化下一个缓存区  
    {  
        // 这里可以添加其他初始化操作  
    }  

    // 析构函数  
    ~AsyncLogger() {  
        stop(); // 确保在析构时停止线程  
    }  

    // 将日志消息添加到缓冲区  
    void Append(const char* logline, int len);  

    // 启动异步日志线程  
    void start() {  
        is_running_ = true; // 设置运行状态为真  
        thread_ = std::thread([this]() { ThreadFunc(); }); // 创建并启动日志处理线程  
    }  

    // 停止异步日志线程  
    void stop();  

public:  
    // 后端日志线程函数  
    void ThreadFunc();  

    // 定义固定大小的缓冲区  
    using Buffer = FixedBuffer<KLargeBuffer>;   
    using BufferPtr = std::unique_ptr<Buffer>; // 缓冲区智能指针  
    using BufferVector = std::vector<BufferPtr>; // 缓冲区向量类型  

    // 冲刷时间间隔  
    const int flushInterval_;   
    bool is_running_; // 日志线程运行状态  
    std::string fileName_; // 日志文件名  

    // 后端线程  
    std::thread thread_;   

    // 互斥锁,用于保护共享数据的线程安全  
    std::mutex mutex_;   
    std::condition_variable cond_; // 条件变量,用于线程同步  

    // 当前和下一个缓冲区指针  
    BufferPtr currentBuffer_;  
    BufferPtr nextBuffer_;      
    
    // 等待写入文件的已填满的缓冲向量  
    BufferVector buffers_;   
};  

这里的异步主要是用空间换时间。该类里面的成员变量很多。

muduo中的日志使用的是双缓冲技术,这里讲解的也主要是参考陈硕书籍《Linux多线程服务端编程》。基本思路是先准备好两块buffer:currentBuffer_nextBuffer_。前端负责往currentBuffer_中填写日志消息,后端负责将nextBuffer_的数据写入文件中(磁盘)。

当currentBuffer_写满后,交换currentBuffer_和nextBuffer_,(交换之后,那么前端就是往nextBuffer_中填写日志消息了),让后端将currentBuffer_的数据写入文件,而前端就往nextBuffer_中填入新的日志消息,如此往复。

先来看看发送方的代码,即是前端代码。

// 将日志消息添加到当前缓冲区  
void AsyncLogger::Append(const char* logline, int len)  
{  
    // 使用唯一锁保护当前的共享资源,确保线程安全  
    std::unique_lock<std::mutex> lock(mutex_);   

    // 检查当前缓冲区是否有足够的可用空间来添加新的日志消息  
    if (currentBuffer_->Available() > len) {  
        // 当前缓冲区有足够的空间,直接将日志消息添加到缓冲区  
        currentBuffer_->Append(logline, len);  
    }  
    else {  
        // 当前缓冲区没有足够的空间,准备处理已满的缓冲区  

        // 将已满的当前缓冲区移动到等待写入的缓冲区容器中  
        buffers_.emplace_back(std::move(currentBuffer_));  

        // 如果下一个可用的缓冲区不为空,则将其移动给 currentBuffer_  
        if (nextBuffer_) {   
            currentBuffer_ = std::move(nextBuffer_);  
        }  
        else {   
            // 否则,创建一个新的缓冲区并赋值给 currentBuffer_  
            currentBuffer_.reset(new Buffer);  
        }  

        // 将日志消息添加到新的当前缓冲区  
        currentBuffer_->Append(logline, len);  

        // 通知后端日志线程,可以读取新填充的缓冲区  
        cond_.notify_one();   
    }  
}  

该函数中主要有3种情况,逻辑也清晰,要说的也在注释中了哈。

而cond_.notify_one()就会唤醒后端log线程,把日志消息真正写入到磁盘文件中了。

来看看后端log线程函数。

// 后端日志线程函数,负责从缓冲区读取日志并写入磁盘文件  
void AsyncLogger::ThreadFunc()  
{  
    // ======================== 步骤1 ============================  
    // 打开磁盘日志文件,创建 LogFile 对象  
    LogFile output(fileName_); // 通过文件名初始化日志文件对象  

    // 准备好后端备用的缓冲区 1 和 2  
    auto newBuffer1 = std::make_unique<Buffer>(); // 创建第一个备用缓冲区  
    auto newBuffer2 = std::make_unique<Buffer>(); // 创建第二个备用缓冲区  

    // 准备读取的缓冲区向量  
    BufferVector buffersToWrite;   
    buffersToWrite.reserve(16); // 预留 16 个元素空间,以提高效率  

    // 后端线程的无限循环,处理日志的写入  
    while (is_running_) {  
        {  
            // ============================= 步骤2 =========================  
            std::unique_lock<std::mutex> lock(mutex_); // 锁定 mutex_ 以确保线程安全  

            // 如果当前没有缓存的日志数据,等待条件变量  
            if (buffers_.empty()) {  
                // 对条件变量进行等待,直到超时或有新的日志写入  
                cond_.wait_for(lock, std::chrono::seconds(flushInterval_));  
            }  

            // 将当前缓冲区移动到待写入的缓冲区容器中  
            buffers_.emplace_back(std::move(currentBuffer_));  

            // 将后端线程准备好的缓冲区 1 赋值给 currentBuffer_  
            currentBuffer_ = std::move(newBuffer1);   

            // 交换 buffers_ 和 buffersToWrite,保证后端线程可以写入  
            buffersToWrite.swap(buffers_);  

            // 如果下一个可用的缓冲区为空,将后端线程的缓冲区 2 赋给 nextBuffer_  
            if (!nextBuffer_) {  
                nextBuffer_ = std::move(newBuffer2);  
            }  
        } // 离开锁的作用域, mutex_ 解锁  

        // ======================== 步骤3 =======================  
        // 将缓冲区内容写入日志文件  
        for (size_t i = 0; i < buffersToWrite.size(); ++i) {  
            // 将已填满的缓冲区中的数据写入到磁盘文件  
            output.Append(buffersToWrite[i]->GetData(), buffersToWrite[i]->GetLength());  
        }  

        // ========================= 步骤4 =====================  
        // 如果待写入的缓冲区数量过多,丢弃多余的缓冲区  
        if (buffersToWrite.size() > 2) {  
            buffersToWrite.resize(2); // 仅保留最近的两个缓冲区  
        }  

        // 恢复后端备用缓冲区  
        if (!newBuffer1) { // 检查 newBuffer1 是否为空  
            newBuffer1 = buffersToWrite.back(); // 从待写入的缓冲区中获取一个  
            buffersToWrite.pop_back(); // 从向量中移除它  
            newBuffer1->Reset(); // 将缓冲区的数据指针归零,准备重新使用  
        }  

        // newBuffer2 的操作与 newBuffer1 相似,此处省略具体代码  

        // 丢弃无用的已写缓冲区  
        buffersToWrite.clear(); // 清除当前的待写入缓冲区  
        output.Flush(); // 冲刷输出,确保数据写入磁盘  
    }  

    output.Flush(); // 在线程结束前最后一次冲刷,确保所有数据都写入  
}  

步骤1:

先打开保存日志的磁盘文件,准备好后端备用的空闲缓冲区1、2,用来交换已写好日志消息的缓冲区。也备好读取的缓冲区vector,用来交换已写入文件的填完完毕的缓冲vector。

这里都是为了在临界区内交换而不用阻塞的。

步骤2:到了while(1)循环里面了。也到了需要加锁的临界区。

首先等待条件是否触发等待触发的条件有两个:其一是超时(超过默认的3s),其二是前端写满了一个或多个buffer。注意这里使用的是if(),没有使用while()循环的。

当条件满足时,将当前缓冲(currentBuffer_)移入buffers_,并立刻将空闲的newBuffer1移为当前缓冲。要注意的是,这整段代码是位于临界区内的,所以不会有任何race condition。

接下来将已填写完毕的缓冲vector(buffers_)与备好的缓冲区buffersToWrite交换(和第12节中的函数doPendingFunctors()中也是要进行交换后,才好执行任务回调函数的),后面的代码就可以在临界区外安全地访问buffersToWrite,将其中的日志数据写入到磁盘文件中。

接着用newBuffer2替换nextBuffer_,这样前端就始终有一个预备buffer 可以调配。

nextBuffer_可以减少前端临界区分配内存的概率,缩短前端临界区长度。(因为前端Append()函数中有一种情况会currentBuffer_.reset(new Buffer);)

步骤3:这时已经出了临界区

  若缓冲区过多,说明前端产生log的速度远大于后端消费的速度,这里只是简单的将它们丢弃(调用erase()函数)。之后把日志消息写入到磁盘(调用LogFile::Append函数),

步骤4:

 调用vector的resize()函数重置该缓冲区vector大小,并将该缓冲区vector内的buffer重新填充给newBuffer1和newBuffer2,这样下一次执行的时候就还有两个空闲buffer可用于替换前端的当前缓冲预备缓冲

3.如何使用AsyncLogger类

在AsyncLogger类中我们使用的是LogFile类,不使用AppendFile类。AppendFile类是供LogFile对象使用的,我们基本不会直接去操作AppendFile类。

记住是通过LogFile类去调用AppendFile类

那关键的异步日志的AsyncLogger类已实现。那该怎么使用该类呢。

对外界可见,可以直接使用的是Logger类,那怎么把这两个类关联起来呢。

AsyncLogger类中是使用AsyncLogger::start()开始后端线程前端收集日志消息是AsyncLogger::Append()函数。我们就要从这两个函数下手。

在Logger类中添加

public:  

    // 获取日志文件名,返回一个 std::string 类型  
    static std::string LogFileName() { return logFileName_; }   
    
    // 定义输出回调函数类型,函数签名为接受一个字符指针和长度  
    using OutputFunc = void (*)(const char* msg, int len);   
    
    // 定义冲刷回调函数类型,无参数返回类型  
    using FlushFunc = void (*)();   

private:  

    // 静态成员变量,存储日志文件的名字  
    static std::string logFileName_; 

 前一节我们输出到stdout是通过Logger类的析构函数来输出的。


//这是上一节的实现

Logger::~Logger()

{

//其他的省略

DefaultOutput(buf.Data(), buf.Length()); //是调用了fwrite()输出到stdout的

}

那我们可以修改析构函数中的DefaultOutput()函数就行,我们可以调用AsyncLogger::Append()函数来让消息日志写到磁盘中。AsyncLogger::Append()就是收集消息日志,写到缓冲中,等到缓存满了,条件符合了,就会notify_one(),唤醒后端log线程,把日志消息真正写到磁盘中去

再来看看AsyncLogger类对象在哪生成呢。

// 声明一个静态唯一指针,指向 AsyncLogger 实例  
static std::unique_ptr<AsyncLogger> asyncLogger;  

// 声明一个静态 once_flag,用于确保某个操作只执行一次  
static std::once_flag g_once_flag;  

// 静态成员变量,用于存储日志文件名  
std::string Logger::logFileName_ = "../li22.log";  

// 用于初始化 AsyncLogger 的函数  
void OnceInit()  
{  
    // 创建 AsyncLogger 的唯一指针并启动后端线程  
    asyncLogger = std::make_unique<AsyncLogger>(Logger::LogFileName());  
    asyncLogger->start(); // 开启后端日志处理线程  
}  

// 异步输出日志消息的函数  
void AsyncOutput(const char* logline, int len)  
{  
    // 确保 OnceInit 只执行一次,防止多线程环境下的竞态条件  
    std::call_once(g_once_flag, OnceInit); // 仅初始化一次  

    // 将日志消息添加到 AsyncLogger 缓冲区  
    asyncLogger->Append(logline, len);  
}  

// 全局变量:输出函数,指向 AsyncOutput 函数  
Logger::OutputFunc g_output = AsyncOutput;  

// Logger 的析构函数  
Logger::~Logger()  
{  
    // 省略其他清理代码...  

    // 调用全局输出函数,将缓冲区内容写入  
    g_output(buf.Data(), buf.Length()); // 输出当前缓冲区的数据  
}  

logFileName_ 就是我们的日志文件名。

那我们从该析构函数Logger::~Logger()开始,g_output()是全局输出函数,这里设置成了AsyncOutput()函数,AsyncOutput()函数使用了std::call_once,这是c++11新增的,可以保证不管是开多少个线程,该函数(OnceInit())只会执行一次。这就可以保证了后端log线程只有一个,这就不会有抢占写日志到磁盘的情况发生。asyncLogger的后端log线程就开始了,那就接着调用AsyncLogger::Append()函数就可以了。下面是其主要的流程图。

异步日志的主体就完成了,还有一些功能,小细节没有实现,下一节再讲解了。

这个双缓冲的设计真不错。

这一节我们就可以把日志输出到磁盘文件了。


网站公告

今日签到

点亮在社区的每一天
去签到