Utils系列之内存池(MultiSizePool)

发布于:2025-07-14 ⋅ 阅读:(20) ⋅ 点赞:(0)

MultiSizePool

MultiSizePool 意思是说我们会分配多个不同大小的chunk,来满足不同size的分配的要求。就像下面的图一样, chunk1~chunk5是不同大小size的chunk,每个chunk下面都有一个block链表来存储数据,同样我们会把空闲的block 放到freelist 中去。

在这里插入图片描述

为了增加性能,我们同样也开辟了了线程缓存,每个线程都有一份自己的缓存,每次取的时候都会先从ThreadCache中先取空闲的block,如果没有了我们再从对应大小的chunk中的freelist 来分配。

Chunk的数组

定义了一个chunk的不同size的数组,可以看到size从8 byte到2k byte 大小不等,这些为小对象的size, 对于超过这些size的,我们直接分配对应的大小(大对象)。 这里的每个size都是一个chunk, 每个chunk都存储了若干个block;block 会预先放到freelist 里面。这里会记录一个block data的大小(block_size), 还有总的大小(block 结构体 + data size);我们会使用 ALIGNMENT 对齐,这里我们是根据C++标准定义’std::max_align_t’,我们这里是16. block_count 的大小我们就可以计算出来啦,CHUNK_SIZE 我们预定义为64k。

我们再看下FreeBlock结构体的定义,定义了一个指向next的指针和size的大小。这里会把存储的数据区放到结构体的后面。

内存结构: struct FreeBlock + data

所以这里有两个函数用于从结构体的首地址指向data 数据区的指针; 和从数据区转到指向结构体地址的指针。

static constexpr std::array<size_t, 16> SIZE_CALSSES = {  //max 2k
    8, 16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1024, 1536, 2048
};

struct ChunkClass {
    std::atomic<FreeBlock*> free_list;
    std::atomic<size_t> allocated_count;
    std::atomic<size_t> deallocated_count;

    size_t block_size;  // 每个block data size
    size_t total_block_size; // 一个block + data的大小
    size_t block_count; // 每个chunk的block数量

    ChunkClass() = default;
    explicit ChunkClass(size_t size):
        free_list(nullptr),
        block_size(size),
        total_block_size(align_of(sizeof(FreeBlock) + size, ALIGNMENT)),
        block_count(CHUNK_SIZE / total_block_size)
        {}
};
// 内存结构: FreeBlock + data
struct FreeBlock {
    size_t size;
    std::atomic<FreeBlock*> next;
    FreeBlock(size_t s):size(s), next(nullptr) {}

    void* data() { // 指向data 数据
        return reinterpret_cast<std::byte*>(this) + sizeof(FreeBlock);
    }

    static FreeBlock* from_data(void* ptr) { // 从data 数据获取FreeBlock指针
        return reinterpret_cast<FreeBlock*>(reinterpret_cast<std::byte*>(ptr) - sizeof(FreeBlock));
    }
};

MultiSizeThreadCache 是一个高效的线程本地缓存实现,它在无锁内存池中扮演着至关重要的角色。这个结构通过减少线程间的竞争,显著提升内存分配和释放的性能。

  • 线程局部存储: 使用thread_local关键字确保每个线程拥有自己独立的缓存实例
  • 分层缓存结构: 为每种内存块大小(SIZE_CLASSES)维护独立的缓存队列
  • 高效访问: 通过数组实现O(1)时间复杂度的块访问

这里我们定义了一个ClassCache来缓存freelist的空闲block,每个chunk size都有一个大小32的block的缓存, count是记录还剩多少的缓存数量。

内存回收机制

当线程终止时,析构函数负责将缓存的所有内存块归还到全局池:

这个过程包含三个关键步骤:

  1. 链表构建: 将离散的块连接成一个链表,准备批量归还
  2. 原子交换: 使用compare_exchange_weak确保线程安全地将链表插入全局链表头部
  3. 计数重置: 归还完成后将缓存计数归零

通过这种线程本地缓存设计,内存池实现了高吞吐量和低延迟的内存分配,特别适合高并发环境下的性能关键型应用。

struct MultiSizeThreadCache {
    struct ClassCache {
        FreeBlock* blocks[32]; // 每个大小类缓存块
        int count;  // 当前缓存的数量
    };
    ClassCache caches[SIZE_CALSSES.size()];
    LockFreeMultiSizePool* pool_ptr; // 指向全局内存池

    ~MultiSizeThreadCache() {

        if (!pool_ptr) return; 
        // 遍历所有大小类
        for (size_t index = 0; index < SIZE_CALSSES.size(); ++index) {
            auto& class_cache = caches[index];
            if (class_cache.count == 0) continue;

            // 将块连接成链表
            for (int i = 0; i < class_cache.count - 1; ++i) {
                class_cache.blocks[i]->next.store(class_cache.blocks[i + 1], std::memory_order_relaxed);
            }

            auto& chunk_class = pool_ptr->chunk_classes[index];
            FreeBlock* old_head = chunk_class.free_list.load(std::memory_order_relaxed);
            FreeBlock* cache_head = class_cache.blocks[0];
            FreeBlock* cache_tail = class_cache.blocks[class_cache.count - 1];

            // 归还到全局链表
            do {
                cache_tail->next.store(old_head, std::memory_order_relaxed);
            } while (!chunk_class.free_list.compare_exchange_weak(old_head, cache_head, 
                                                                std::memory_order_release, 
                                                                std::memory_order_relaxed));
            class_cache.count = 0;
        }
    }

};

std::array<ChunkClass, SIZE_CALSSES.size()> chunk_classes;
LockFreeStack<std::unique_ptr<std::byte[]>> allocated_chunks;

static constexpr size_t ALIGNMENT = alignof(std::max_align_t);
static constexpr size_t CHUNK_SIZE = 64 * 1024; // 64k

static inline thread_local MultiSizeThreadCache thread_cache;

初始化

我们在构造的时候就把每个chunk对象的size分配好,这样我们在调用allocate的函数的时候就不需要分配了。注意这里的new (&chunk_classes[i]) ChunkClass(SIZE_CALSSES[i]); 不是分配操作,而是placement new 操作符,在已分配的内存地址上构造对象,而不进行新的分配。

LockFreeMultiSizePool::LockFreeMultiSizePool() {
    for (size_t i = 0; i < SIZE_CALSSES.size(); ++i) {
        new (&chunk_classes[i]) ChunkClass(SIZE_CALSSES[i]);
    }
    thread_cache.pool_ptr = this;
}

填充缓存

在我们调用allocate分配函数之前,先熟悉下填充缓存的操作。主要的操作有:

  • 检查索引的有效性和缓存是否已有足够内存块
  • 计算批量获取的数量(缓存容量的一半)
  • 尝试从全局空闲列表中获取一串连续的内存块,如果全局空闲列表为空,则分配新的内存块(调用allocate_chunk_for_size_class
  • 使用无锁方式更新全局空闲列表头部
  • 将获取的内存块放入线程本地缓存
bool LockFreeMultiSizePool::fill_class_cache(size_t index) {
    if (index >= SIZE_CALSSES.size()) return false;

    ChunkClass& chunk_class = chunk_classes[index];
    auto& class_cache = thread_cache.caches[index];

    // 已经有缓存,不需要填充
    if (class_cache.count > 0) return true;

    int BATCH_SIZE = std::size(class_cache.blocks) / 2;

    // 尝试从全局链表获取多个块
    FreeBlock* old_head = chunk_class.free_list.load(std::memory_order_acquire);
    if (!old_head) {
        allocate_chunk_for_size_class(index);
        old_head = chunk_class.free_list.load(std::memory_order_acquire);
        if (!old_head) return false;
    }

    //获取一串块
    FreeBlock* current = old_head;
    FreeBlock* new_head = nullptr;

    int count = 0;

    for (int i = 0; i < BATCH_SIZE - 1 && current->next.load(std::memory_order_relaxed); ++i) {
        current = current->next.load(std::memory_order_relaxed);
        ++count;
    }

    new_head = current->next.load(std::memory_order_relaxed);

    // 更新链表头
    if(!chunk_class.free_list.compare_exchange_strong(old_head, new_head, std::memory_order_release, std::memory_order_relaxed)) {
        return false;
    }

    //将获取的块放入本地缓存
    current = old_head;
    while (current != new_head) {
        class_cache.blocks[class_cache.count++] = current;
        current = current->next.load(std::memory_order_relaxed);
    }

    return true;
}

void LockFreeMultiSizePool::allocate_chunk_for_size_class(size_t index) {
    if (index >= SIZE_CALSSES.size()) return; // 分配大对象
    ChunkClass& chunk_class = chunk_classes[index];
    size_t block_count = chunk_class.block_count;

    if (block_count == 0) block_count = 1;

    size_t actual_chunk_size = chunk_class.total_block_size * block_count;
    auto chunk = std::make_unique<std::byte[]>(actual_chunk_size); // 分配一个chunk
    std::byte* ptr = chunk.get();

    FreeBlock* first_block = nullptr;
    FreeBlock* prev_block = nullptr;
    FreeBlock* block  = nullptr;
    // 将chunk 添加到对应大小的free_list 里面
    for (size_t i = 0; i < block_count; ++i) {
        // 获取每个block
        block = new(ptr) FreeBlock(chunk_class.block_size);
        ptr += chunk_class.total_block_size;
        if (i == 0) {
            first_block = block;
        }

        if (prev_block) {
            prev_block->next.store(block, std::memory_order_relaxed);
        }
        prev_block = block;
    }
    // 将block 添加到free_list
    FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_relaxed);
    do {
        block->next.store(old_block, std::memory_order_relaxed);
    } while(!chunk_class.free_list.compare_exchange_weak(old_block, first_block,  std::memory_order_release, std::memory_order_relaxed));
    allocated_chunks.push(std::move(chunk)); // 将chunk 添加到allocated_chunks 管理
}

size_t LockFreeMultiSizePool::get_size_class_index(size_t size) {
    // 二分查找到对应的index
    size_t left = 0;
    size_t right = SIZE_CALSSES.size() - 1;
    
    while (left <= right) {
        size_t mid = left + (right - left) / 2;
        if (SIZE_CALSSES[mid] < size)
            left = mid + 1;
        else if (SIZE_CALSSES[mid] > size && mid > 0)
            right = mid - 1;
        else
            return mid;
    }
    
    return (left < SIZE_CALSSES.size()) ? left : SIZE_CALSSES.size();
}

allocate 操作

这个函数是内存池的核心操作,我们通过下面步骤:

  1. 大小对齐与分类:首先将请求的内存大小按照指定对齐值(ALIGNMENT)进行对齐,然后确定对应的大小类别索引。
  2. 分配策略层次
    • 首先尝试从线程本地缓存获取内存块
    • 如果本地缓存为空,尝试批量填充本地缓存
    • 如果仍然无法获取,从全局共享的无锁空闲列表获取
    • 最后,如果空闲列表为空,则分配新的内存块
  3. 原子操作保证:使用无锁算法(CAS操作)确保在多线程环境中安全分配内存。
void* LockFreeMultiSizePool::allocate(size_t size) {
    if (size == 0) return nullptr;

    thread_cache.pool_ptr = this; // 设置当前线程的内存池实例
    size_t aligned_size = align_of(size, ALIGNMENT);
    size_t index = get_size_class_index(aligned_size);
    if (index >= SIZE_CALSSES.size()) {  // 分配大对象
        void* ptr = std::aligned_alloc(ALIGNMENT, aligned_size);
        return ptr;
    }

    ChunkClass& chunk_class = chunk_classes[index];
    FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_acquire);
    FreeBlock* block = nullptr;

    // 尝试从本地缓存分配
    auto& class_cache = thread_cache.caches[index];
    if (class_cache.count > 0) {
        FreeBlock* block = class_cache.blocks[--class_cache.count];
        chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);
        return block->data(); // 返回数据指针
    }
    // 尝试批量获取块到本地缓存
    if (fill_class_cache(index)) {
        FreeBlock* block = class_cache.blocks[--class_cache.count];
        chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);
        return block->data(); // 返回数据指针
    }

    // 尝试从空闲列表获取
    while(old_block) {
        if(chunk_class.free_list.compare_exchange_weak(old_block, old_block->next.load(std::memory_order_relaxed))) {
            block = old_block;
            break;
        }
    }
    if (!block) {
        allocate_chunk_for_size_class(index);

        old_block = chunk_class.free_list.load(std::memory_order_acquire);
        // 再次尝试从空闲列表获取
        while(old_block) {
            if(chunk_class.free_list.compare_exchange_weak(old_block, old_block->next.load(std::memory_order_relaxed))) {
                block = old_block;
                break;
            }
        }
    }

    if(block) {
        chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);
        return block->data(); // 返回数据指针
    }
    return nullptr;
}

deallocate 操作

deallocate 其实和allocate是一对相反的操作。主要把我们的分配的内存指针放回到cache里面中,如果是大对象,我们直接会释放。对于小对象,我们这里做了一个策略,如果检测到缓存已经满了,我们会先把缓存的空间归还到全局链表中,然后再放回到缓存,这样一次操作大大提高后面的释放的效率。

void LockFreeMultiSizePool::deallocate(void* ptr, size_t size) {
    if (ptr == nullptr) return;

    thread_cache.pool_ptr = this; // 设置当前线程的内存池实例
    size_t align_size = align_of(size, ALIGNMENT);
    size_t index = get_size_class_index(align_size);
    if (index >= SIZE_CALSSES.size()) {
        std::free(ptr);  // 大对象直接释放
        return;
    }

    ChunkClass& chunk_class = chunk_classes[index];
    FreeBlock* block_ptr = FreeBlock::from_data(ptr);  // 获取block
    
    // 尝试先放入本地缓存
    auto& class_cache = thread_cache.caches[index];
    if (class_cache.count < static_cast<int>(sizeof(class_cache.blocks) / sizeof(class_cache.blocks[0]))) {
        class_cache.blocks[class_cache.count++] = block_ptr;
    } else {
        // 本地缓存已满,将一半缓存归还全局链表
        int half_count = class_cache.count / 2;

        // 构建链表
        for (int i = 0; i < half_count - 1; ++i) {
            class_cache.blocks[i]->next.store(class_cache.blocks[i + 1], std::memory_order_relaxed);
        }

        // 将新块添加到链表尾部
        class_cache.blocks[half_count - 1]->next.store(block_ptr, std::memory_order_relaxed);

        // 归还给全局链表
        FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_relaxed);
        do {
            block_ptr->next.store(old_block, std::memory_order_relaxed);
        } while(!chunk_class.free_list.compare_exchange_weak(old_block, class_cache.blocks[0], std::memory_order_release, std::memory_order_relaxed));

        // 压缩剩余缓存 更新class_cache
        for (int i = 0; i < class_cache.count - half_count; ++i) {
            class_cache.blocks[i] = class_cache.blocks[i + half_count];
        }
        class_cache.count -= half_count;
    }
    chunk_class.deallocated_count.fetch_add(1, std::memory_order_relaxed);
}

最后我们还封装了一个类来简单使用内存池,用户只需要调用allocatedeallocate 就好。

class MemoryPoolAllocater {
private:
    LockFreeMultiSizePool pool;

public:
    void* allocate(size_t size) {
        return pool.allocate(size);
    }

    void deallocate(void* ptr, size_t size) {
        pool.deallocate(ptr, size);
    }

    template<typename T, typename... Args>
    T* create(Args&&... args) {
        void* ptr = allocate(sizeof(T));
        if (ptr) {
            return new (ptr) T(std::forward<Args>(args)...);
        }
        return nullptr;
    }

    template<typename T>
    void destroy(T* ptr) {
        if (ptr) {
            ptr->~T();
            deallocate(ptr, sizeof(T));
        }
    }

    void print_stats() const {
        pool.print_stats();
    }

    void reset_global_state() {
        LockFreeMultiSizePool::reset_global_state();
    }
};

详细代码见: MultiSizePool


网站公告

今日签到

点亮在社区的每一天
去签到