跟我学c++中级篇——多线程中的文件处理

发布于:2025-06-10 ⋅ 阅读:(20) ⋅ 点赞:(0)

一、文件处理

作为IO处理的一种重要场景,文件处理是几乎所有编程都无法绕过的一个情况。稍微复杂的一些的程序都可能需要文件处理,不管这种文件处理对开发者来说是显式的还是隐式的。相对于其它语言,C++并未提供多么好的文件处理API接口,即使发展到现在,C++新标准的文件处理,相比与C#等语言处理起文件的方式仍然要落后不少。
文件处理相对来说的复杂再加上C++中线程管理的复杂,二者结合到一起,就会产生各种大大小小的问题。

二、多线程与资源控制

其实对开发者来说,不管是文件管理还是其它资源管理,最主要的就是在多线程的切换中,保持安全性(不能崩溃)、数据的准确性(不能写得不对)。在这些基础上,如何提高资源处理的速度并按照开发者既定的意愿去完成相关的资源控制,这才是难点所在。
特别在一些具体的场景中,如大文件(海量日志文件)、数据库操作以及图像视频等的处理,这都需要多线程参与下的高效率的处理。在前面学习多线程时知道对资源的控制一般有几种处理方法:
1、使用锁
这里的锁,包括各种的互斥体和信号量等
2、使用原子变量
其实,使用原子变量的目的也类似于锁
3、使用无锁编程
这个就相对复杂很多,而且适用场景也受限

三、多线程条件下的文件处理方法

上面的这些方法对于所有的资源控制都是行之有效的,但针对文件处理,可能一些更具体的方法。对于文件来说,需要处理两种情况即:
1、文件的写
文件的写是一种常见的保存数据的方法,数据库的写其实也一种文件的写,只不过,上面又抽象了一层数据库的相关操作。对于写文件来说,最基础的是写入的完整性、一致性,最重要的是写入的速度。
多线程的写入,往往因为同步的问题,引起以下的情况:
a)由于无法同步导致的问题
包括数据写入顺序不一致引起的数据覆盖以及数据顺序的不正确,导致数据的完整性的缺损,从而最终导致文件可能无法打开
b)因为同步导致写入效率的问题
多线程的情况下,不适当的同步,或者说即使是适当的同步,也会大幅的降低写文件的速度
c)引入异步IO导致的编程复杂性
异步IO的操作本身就是一个难点,这对于很多开发者说,掌握的都不是很到位
d)是否使用写缓冲
其实在很多情况,特别是在多线程的情况下,往往会把并发写转成串行写,数据量的增加往往要求引入缓冲区
总之,写文件,是多线程操作中相对复杂和困难的情况。
写文件有几种情况比较特殊,一种是写入大量的小文件,这种情况在互联网中特别常见,比如大量的商品的缩略图;另外一种是写一个非常大的文件,如日志;另外还有大家常见的如BT等下载软件,多线程分段下载然后最终再组成一个大文件(分块处理)。

2、文件的读
文件的读相对写来说应用场景更丰富,在互联网中针对文件读还专门有各种的优化方法。比如各种缓冲、临时文件等等。
多线程情况下的文件读其实有很多种方法,来适应不同的场景:
a)使用缓冲
这种缓冲既包括硬件本身的缓冲也包括软件层次的缓冲,甚至是框架之间的缓冲,如使用内存型数据库(如Redis)+传统的数据库(如MySql),前者就可以作为后者的缓冲。而在C++编程中也提供了iosteam的缓冲的控制。其它的一些系统API和库的API也多少都提供了类似的功能。
b)使用异步IO
异步IO的问题主要就在于异步编程的复杂度,这里不再赘述
c)多线程读取的时效问题
也就是常见的读写同时在进行时,如何保证读的时效性(即尽可能减少脏读),特别是在分布式、多线程的情况下。

四、典型的文件处理方式

内存缓冲区 使用新的异步框架 或新技术
典型的文件处理方式一般有以下几种处理方法:
1、使用内存映射
这个在操作一些大的日志文件时,经常使用这种内存映射,从而快速读写日志。
2、使用缓冲
包括前面提到的软件层次的缓冲、硬件缓冲及内核缓冲等。
3、使用最新的框架或技术
比如前文“Linux新的IO模型io_uring”提到的io_uring以及其它的新技术、新框架甚至是新思想,来解决某些对文件操作极度严苛的场景。
4、使用合理的策略
这个就比较灵活了,比如上面提到的大文件的多线程下载,就可以使用合理的策略来进行分块然后再进行传输、验证、组合等等。另外在数据库读写中,面对大量的写可以使用批处理,而使用缓冲时,可以在大多数场景下指定临界值再进行真正的写入等等。
5、多线程的合理控制
无论何种情况,只要发生在了多线程并发或并行的场景下,合理调试线程和分配任务就是一种高优先级的考虑方向了。也可以这样认为,面对复杂的文件处理,不是某一层可以解决的,它是从上到下,从里到外,一个综合应用的场景。

五、实际的例子

下面级出一个大日志文件使用内存映射操作的例程:

#include <iostream>
#include <fstream>
#include <string>
#include <cstring>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <system_error>

class MemMappedFile {
public:
    explicit MemMappedFile(const std::string& path, bool enable = false) {
        // 打开文件
        int flags = enable ? O_RDWR : O_RDONLY;
        m_fd = open(path.c_str(), flags);
        if (m_fd == -1) {
          return;
        }

        // 获取映射文件大小
        struct stat sb;
        if (fstat(m_fd, &sb) == -1) {
            close(m_fd);
            return ;
        }
        m_size = sb.st_size;

        // 创建内存映射
        int prot = PROT_READ | (enable ? PROT_WRITE : 0);
        m_data = mmap(nullptr, m_size, prot, MAP_SHARED, m_fd, 0);
        if (m_data == MAP_FAILED) {
            close(m_fd);
            return;
        }
    }

    ~MemMappedFile() {
        if (m_data != nullptr) {
            munmap(m_data, m_size);
        }
        if (m_fd != -1) {
            close(m_fd);
        }
    }


    MemMappedFile(const MemMappedFile&) = delete;
    MemMappedFile& operator=(const MemMappedFile&) = delete;
public:
    char* data() const { return static_cast<char*>(m_data); }
    size_t size() const { return m_size; }

    // 同步修改到磁盘
    void syncToDisk() {
        if (msync(m_data, m_size, MS_SYNC) == -1) {
            return;
        }
    }

private:
    int m_fd = -1;
    void* m_data = nullptr;
    size_t m_size = 0;
};
// 处理数据
void processData(const char* chunk, size_t  size) {
    // 日志处理,略过
}
int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cerr << "cur use: " << argv[0] << std::endl;
        return 1;
    }

    try {
        //创建映射
        bool enable = (argc >= 3);
        MemMappedFile mmapFile(argv[1], enable);

        const char* data = mmapFile.data();
        size_t size = mmapFile.size();

        size_t lineCount = 0;
        size_t errNum = 0;
        const char* keyword = "ERR";

        for (size_t i = 0; i < size; ++i) {
            if (data[i] == '\n') {
                lineCount++;
            }

            if (strncmp(&data[i], keyword, strlen(keyword)) == 0) {
                errNum++;
                i += strlen(keyword) - 1;
            }
        }

        std::cout << "lines is: " << lineCount<< "errors is: " << errNum  << std::endl;

        if (enable) {
            const char* newHeader = "start modify log...\n";
            size_t headerLen = strlen(newHeader);

            if (size >= headerLen) {
                memcpy(mmapFile.data(), newHeader, headerLen);
                mmapFile.syncToDisk();
            }
        }

        //分块处理
        const size_t chunkSize = 1024 * 1024;
        for (size_t offset = 0; offset < size; offset += chunkSize) {
            size_t chunkLen = std::min(chunkSize, size - offset);
            processData(data + offset, chunkLen);
        }

    } catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return 1;
    }

    return 0;
}

注释已经很清晰,大家可以参考一下。

六、总结

多线程下的文件处理,需要整合前面学习的很多知识点。大家不用把它想象的多么难,重点在于分析实际的应用场景,找出一个合适的解决方案就可以了。不是每个开发者都会遇到海量的数据读写。但掌握一些海量数据下的文件处理的经验却是非常必要的。