文章目录
高并发内存池
内存池的相关概念
内存池是一种内存管理技术,用于在程序启动时(或需要时)预先申请分配一大块内存(称为“池”),然后由程序自行管理这块内存的分配和释放,而不是每次都直接使用操作系统提供的内存分配接口(如 malloc / free 或 new / delete)。
一、为什么需要内存池?—— 传统内存分配的弊端
直接使用系统调用分配内存存在一些性能和功能上的问题:
性能开销(Performance Overhead):
系统调用开销:每次 malloc 或 new 都可能涉及从用户态切换到内核态,内核需要处理分配请求,这个过程比较耗时。
算法复杂性:通用的内存分配器(如 glibc 的 ptmalloc)需要处理任意大小、任意生命周期的内存分配,其算法非常复杂(例如使用隐式链表、显式链表或分离空闲链表),在分配和释放时需要进行搜索和合并操作。
内存碎片(Memory Fragmentation):
外部碎片:频繁地分配和释放不同大小的内存块,会导致内存中间出现大量小的、不连续的空闲碎片。当需要分配一块较大的连续内存时,即使总空闲内存足够,也可能因为找不到连续空间而失败。
内部碎片:分配器为了对齐和管理方便,实际分配的内存可能略大于用户请求的内存,这之间的浪费就是内部碎片。
不确定性(Non-Deterministic):
由于分配器需要根据当前内存状态进行搜索和决策,每次分配所花费的时间是不确定的。这在实时性要求高的系统(如游戏、嵌入式系统、高频交易)中是致命的。
二、内存池如何解决这些问题?
内存池通过“一次性批发,零售管理”的策略来解决上述问题
:
减少系统调用和锁竞争:
程序初始化时一次性申请一大块(chunk)内存。后续所有的内存分配都在这一大块内存中进行,无需再频繁调用 malloc,极大地减少了用户态/内核态切换和全局堆锁的竞争。
避免内存碎片:
固定大小内存池:专门用于分配固定大小的对象(如网络连接结构体、游戏中的小物体)。由于每次分配的大小一致,分配和释放非常简单,只需要一个空闲链表(Free List),完全避免了外部碎片,内部碎片也恒定。(此即为要实现的)
可变大小内存池:虽然仍需处理不同大小的请求,但它的活动范围被限制在池内,不会导致整个进程的堆空间产生碎片。可以采用更简单、更高效的算法来管理。
常数时间分配与释放:
尤其是固定大小内存池,分配操作就是从空闲链表头部摘下一个节点(O(1)),释放操作就是将节点放回链表头部(O(1)),速度极快且稳定。
改善局部性(Locality):
从内存池中分配出去的对象通常物理地址是连续的或靠近的。这可以提高 CPU 缓存的命中率(Cache Friendliness),从而提升程序性能。
三、内存池的常见类型与工作原理
1. 固定大小内存池(Fixed-Size Pool / Memory Pool for Objects)
这是最简单、最常用、效果最显著的内存池。
工作原理:
初始化:预分配一大块连续内存(pool_block),并将其划分为多个大小完全相等的“内存块”(block)。每个块的大小正好等于目标对象的大小加上一些对齐开销。
构建空闲链表:将所有这些内存块的首地址串联成一个单向空闲链表。
分配:当用户请求分配一个对象时,从空闲链表头部取出一个节点,将地址返回,并将链表头指向下一个节点。
释放:用户归还内存时,将该内存块的地址重新放回空闲链表的头部。
数据结构(参考):
struct memory_pool_t {
void* pool_block; // 指向大块内存的起始地址
size_t block_size; // 每个块的大小(例如:sizeof(MyObject))
size_t total_blocks; // 总块数
void** free_list; // 空闲链表头(指向第一个可用的块)
// ... 可能还有其他字段,如用于扩展的链表等
};
2. 可变大小内存池(Variable-Size Pool)
用于分配不同大小的内存块,管理起来更复杂,常见策略有:
分离空闲链表(Segregated Free Lists):
维护多个不同大小规格的空闲链表(例如 8B, 16B, 32B, 64B …)。申请时,向上取整到最近的标准规格,然后从对应的固定大小链表中分配。这实际上是多个固定大小内存池的组合。
伙伴系统(Buddy System):
将大块内存不断对等分割,直到满足请求的大小。分配时寻找最合适的块,释放时检查其“伙伴”是否空闲,如果空闲就合并成更大的块。可以有效减少外部碎片,但可能产生内部碎片。Linux 内核就使用伙伴系统管理页帧。
四、内存池的优缺点总结
优点 | 缺点 |
---|---|
高性能:分配/释放速度极快(尤其是O(1)操作)。 | 初始化开销:启动时需要预分配内存,增加了启动时间。 |
确定性:分配时间稳定,适合实时系统。 | 不灵活:固定大小池只能分配一种大小;可变大小池管理复杂。 |
低碎片:尤其是固定大小池,几乎无碎片。 | 可能造成浪费:如果预分配的内存没有被完全使用,就会造成空间浪费。 |
高缓存命中率:改善内存访问局部性。 | 需要手动管理:需要根据应用场景自行设计和实现,增加了代码复杂性。 |
减少锁竞争:多线程环境下可为每个线程配置独立内存池,避免竞争全局堆锁。 | 调试困难:内存池管理的内存不易被传统内存调试工具(如Valgrind)检测。 |
定长内存池的简单实现
版本1
先分配,后管理。看似销毁的空间实则回收了利用自由链表管理起来,下次新开辟则从自由链表分配,这就是空间的复用机制,减少了内核的开销。
1. 核心设计思想
对象池通过预先分配大块内存和维护空闲对象链表来避免频繁调用系统的malloc/free,从而提高内存分配效率,特别适合需要频繁创建和销毁小对象的场景。
2. 关键数据结构
std::vector<void*> _memoryblocks; // 存储所有分配的内存块指针
char* _memory; // 指向当前内存块的可用起始位置
size_t _leftBytes; // 当前内存块的剩余字节数
void* _freeList; // 空闲对象链表头指针
3. New() 操作逻辑
3.1 优先使用空闲链表
if (_freeList) {
obj = (T*)_freeList;
_freeList = *((void**)_freeList); // 链表头指针指向下一个节点
}
直接从空闲链表获取已回收的对象
利用对象的前几个字节存储下一个空闲对象的地址(嵌入式指针技术)
3.2 需要新内存时的处理
if (_leftBytes < sizeof(T)) {
_leftBytes = 32 * 4 * 1024; // 分配128KB新块
_memory = (char*)malloc(_leftBytes);
_memoryblocks.push_back(_memory); // 记录内存块
}
当当前内存块不足时,分配新的大块内存(128KB)
将新块加入内存块列表管理
3.3 内存对齐处理
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
确保每个对象至少有一个指针的大小,以便在回收时能够存储下一个节点的地址
这是嵌入式指针技术的必要条件
3.4 对象构造
new(obj) T(); // 定位new,在指定内存调用构造函数
使用定位new在已分配的内存上构造对象
避免额外的内存分配开销
4. Delete() 操作逻辑
4.1 显式调用析构函数
obj->~T(); // 显式调用析构函数进行清理
只清理对象内容,不释放内存
4.2 头插法回收对象
*((void**)obj) = _freeList; // 将当前空闲链表头存入对象的前几个字节
_freeList = obj; // 更新链表头为当前对象
将对象重新链接到空闲链表
使用头插法保证O(1)时间复杂度的回收操作
5. 内存管理策略
5.1 批量分配
每次分配128KB大块内存,减少malloc调用次数
适合小对象的高效管理
5.2 内存块生命周期管理
~ObjectPool() {
for(void* ptr : _memoryblocks)
free(ptr); // 析构时释放所有内存块
}
对象池负责所有内存的释放
用户不需要关心单个对象的释放
6.具体实现
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include<cstdlib>
using std::cout;
using std::endl;
template<class T>
class ObjectPool
{
public:
ObjectPool()
: _memory(nullptr)
, _leftsize(0)
, _freelist(nullptr)
{}
T* New()
{
T* obj = nullptr;
// 如果⾃由链表有对象,直接取⼀个
if (_freeList)
{
obj = (T*)_freeList;
_freeList = *((void**)_freeList);
}
else
{
if (_leftBytes < sizeof(T))
{
_leftBytes = 32 * 4 * 1024;
_memory = (char*)malloc(_leftBytes);
_memoryblocks.push_back(static_cast<void*>(_memory));
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) :
sizeof(T);
_memory += objSize;
_leftBytes -= objSize;
}
// 使⽤定位new调⽤T的构造函数初始化
new(obj) T();
return obj;
}
void Delete(T* t)
{
// 显⽰调⽤的T的析构函数进⾏清理
obj->~T();
// 头插到freeList
*((void**)obj) = _freeList;
_freeList = obj;
}
~ObjectPool()
{
for(void* ptr : _memoryblocks)
free(ptr);
}
private:
std::vector<void*> _memoryblocks; //所有内存块
char* _memory; //当前内存块
size_t _leftsize; //当前内存块剩余大小
void* _freelist; //自由链表
};
版本2
版本2的核心设计逻辑其实和版本1差不多,就是以版本1为基础扩展了一部分内容,增加了系统调用和用一些结构体包装了块内容和链表。其核心实现还是对象池的先分配大内存,在分开管理。
1.整体架构设计
这个内存池采用了两层管理结构:
内存块层:以大块内存为单位进行系统级分配
对象层:在内存块内部管理单个对象的分配和回收
2.核心数据结构
.1. MemoryBlock - 内存块管理
struct MemoryBlock {
MemoryBlock* next; // 指向下一个内存块
char data[1]; // 柔性数组,实际存储对象数据
};
作用:管理从系统分配的大块内存
设计特点:使用链表连接所有内存块,便于遍历和释放
.2. FreeNode - 空闲对象管理
union FreeNode {
FreeNode* next; // 空闲时:存储指向下一个空闲节点的指针
char data[sizeof(T)]; // 分配时:存储对象数据
};
作用:管理空闲对象内存,复用内存空间
设计特点:使用union节省内存,空闲时存储链表指针,分配时存储对象数据
3.内存分配策略
分配优先级:
首选空闲链表:从_freeList中直接获取已释放的内存
次选当前内存块:从当前内存块的剩余空间中分配
最后分配新块:当前块用完时分配新的内存块
4.内存布局示例
假设BlockSize = 4, sizeof(T) = 16:
MemoryBlock 1:
+----------------+----------------+----------------+----------------+
| FreeNode* next | Object 1 data | Object 2 data | Object 3 data | Object 4 data |
+----------------+----------------+----------------+----------------+
MemoryBlock 2: (当第一个块用完时分配)
+----------------+----------------+----------------+----------------+
| FreeNode* next | Object 5 data | Object 6 data | Object 7 data | Object 8 data |
+----------------+----------------+----------------+----------------+
空闲链表: (释放的对象3和对象6)
Object6 -> Object3 -> nullptr
5.关键算法细节
.1. 新内存块分配计算
const size_t blockMemorySize = sizeof(MemoryBlock) + BlockSize * sizeof(T) - 1;
const size_t pageCount = (blockMemorySize + kPageSize - 1) / kPageSize;
计算原理:计算需要的确切内存大小并转换为页数
-1的作用:因为char data[1]已经包含了1字节,所以要减掉
.2. 对象复用机制
// 释放时:
FreeNode* node = reinterpret_cast<FreeNode*>(obj);
node->next = _freeList; // 头插法加入空闲链表
_freeList = node;
// 分配时:
FreeNode* node = _freeList;
_freeList = _freeList->next; // 从链表头部取出设计优势:头插法操作时间复杂度为O(1)
.3. 内存对齐保证
SystemAlloc返回页对齐的内存地址
sizeof(T)包含编译器计算的对齐要求
从对齐基地址以sizeof(T)为步长分配,自然保持对齐
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <memory>
#ifdef __linux__
#include <sys/mman.h>
#else
#include <windows.h>
#endif
using std::cout;
using std::endl;
constexpr size_t kPageSize = 1 << 12; // 4KB
inline static void* SystemAlloc(size_t kpage)
{
void* ptr = nullptr;
#ifdef _WIN32
ptr = VirtualAlloc(nullptr, kPageSize * kpage, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
if (ptr == nullptr)
throw std::bad_alloc();
#else
ptr = mmap(nullptr, kPageSize * kpage, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (ptr == MAP_FAILED)
throw std::bad_alloc();
#endif
return ptr;
}
inline static void SystemFree(void* ptr, size_t kpage)
{
if (ptr == nullptr)
return;
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
munmap(ptr, kpage * kPageSize);
#endif
}
/**
* @brief 定长对象内存池
* @tparam T 要管理的对象类型
* @tparam BlockSize 每个内存块包含的对象数量,默认256个
*/
template<class T, size_t BlockSize = 256>
class ObjectPool
{
private:
// 内存块结构
struct MemoryBlock {
MemoryBlock* next;
char data[1]; // 柔性数组,实际大小在分配时确定
MemoryBlock() : next(nullptr) {}
};
// 空闲节点,使用union复用内存
union FreeNode {
FreeNode* next;
char data[sizeof(T)];
FreeNode() : next(nullptr) {}
};
public:
ObjectPool()
: _blocks(nullptr)
, _freeList(nullptr)
, _allocatedCount(0)
{}
~ObjectPool()
{
Clear();
}
// 禁用拷贝
ObjectPool(const ObjectPool&) = delete;
ObjectPool& operator=(const ObjectPool&) = delete;
/**
* @brief 分配一个对象
*/
T* New()
{
T* obj = nullptr;
// 优先从空闲链表获取
if (_freeList != nullptr) {
FreeNode* node = _freeList;
_freeList = _freeList->next;
obj = reinterpret_cast<T*>(node);
_allocatedCount++;
} else {
// 需要分配新的内存块
if (_currentBlock == nullptr || _currentOffset >= BlockSize * sizeof(T)) {
if (!AllocateNewBlock()) {
throw std::bad_alloc();
}
}
obj = reinterpret_cast<T*>(_currentBlock->data + _currentOffset);
_currentOffset += sizeof(T);
_allocatedCount++;
}
// 使用placement new调用构造函数
new(obj) T();
return obj;
}
/**
* @brief 释放对象
*/
void Delete(T* obj)
{
if (obj == nullptr) return;
// 调用析构函数
obj->~T();
// 将内存加入空闲链表
FreeNode* node = reinterpret_cast<FreeNode*>(obj);
node->next = _freeList;
_freeList = node;
_allocatedCount--;
}
/**
* @brief 清空所有内存
*/
void Clear()
{
// 释放所有内存块
MemoryBlock* block = _blocks;
while (block != nullptr) {
MemoryBlock* next = block->next;
SystemFree(block, 1); // 每个块正好1页
block = next;
}
_blocks = nullptr;
_currentBlock = nullptr;
_currentOffset = 0;
_freeList = nullptr;
_allocatedCount = 0;
}
/**
* @brief 获取已分配对象数量
*/
size_t GetAllocatedCount() const { return _allocatedCount; }
/**
* @brief 获取总容量(对象数量)
*/
size_t GetCapacity() const
{
size_t count = 0;
MemoryBlock* block = _blocks;
while (block != nullptr) {
count += BlockSize;
block = block->next;
}
return count;
}
private:
/**
* @brief 分配新的内存块
*/
bool AllocateNewBlock()
{
// 计算需要的内存页数
const size_t blockMemorySize = sizeof(MemoryBlock) + BlockSize * sizeof(T) - 1;
const size_t pageCount = (blockMemorySize + kPageSize - 1) / kPageSize;
// 使用系统调用分配对齐的内存
MemoryBlock* newBlock = static_cast<MemoryBlock*>(SystemAlloc(pageCount));
if (newBlock == nullptr) {
return false;
}
// 初始化新块
new (newBlock) MemoryBlock();
newBlock->next = _blocks;
_blocks = newBlock;
_currentBlock = newBlock;
_currentOffset = 0;
return true;
}
private:
MemoryBlock* _blocks; // 内存块链表
MemoryBlock* _currentBlock; // 当前正在使用的内存块
size_t _currentOffset; // 当前块中的偏移量
FreeNode* _freeList; // 空闲链表
size_t _allocatedCount; // 已分配对象计数
};
// 测试示例
class TestObject {
public:
int id;
double value;
std::string name;
TestObject() : id(0), value(0.0), name("default") {
cout << "TestObject constructed: " << id << endl;
}
~TestObject() {
cout << "TestObject destroyed: " << id << endl;
}
void SetData(int i, double v, const std::string& n) {
id = i;
value = v;
name = n;
}
void Print() const {
cout << "ID: " << id << ", Value: " << value << ", Name: " << name << endl;
}
};
void TestObjectPool()
{
cout << "=== ObjectPool Test ===" << endl;
ObjectPool<TestObject> pool;
std::vector<TestObject*> objects;
// 分配多个对象
for (int i = 1; i <= 10; ++i) {
TestObject* obj = pool.New();
obj->SetData(i, i * 10.0, "Object_" + std::to_string(i));
objects.push_back(obj);
}
cout << "Allocated: " << pool.GetAllocatedCount() << endl;
cout << "Capacity: " << pool.GetCapacity() << endl;
// 使用对象
for (auto obj : objects) {
obj->Print();
}
// 释放部分对象
for (int i = 0; i < 3; ++i) {
if (!objects.empty()) {
pool.Delete(objects.back());
objects.pop_back();
}
}
cout << "After deletion - Allocated: " << pool.GetAllocatedCount() << endl;
// 重新分配(应该复用内存)
for (int i = 11; i <= 13; ++i) {
TestObject* obj = pool.New();
obj->SetData(i, i * 10.0, "Reused_" + std::to_string(i));
objects.push_back(obj);
}
cout << "Final - Allocated: " << pool.GetAllocatedCount() << endl;
cout << "Final - Capacity: " << pool.GetCapacity() << endl;
// 清理
for (auto obj : objects) {
pool.Delete(obj);
}
objects.clear();
cout << "After cleanup - Allocated: " << pool.GetAllocatedCount() << endl;
}
版本3
此版本多了严格的内存对齐和加入了锁以及可变参数的机制,但本质没变,逻辑还是一样的,但是因为算法多了所以效率有些下降。具体逻辑不解释了,直接看代码。
#pragma once
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <new>
#include <pthread.h>
#include <utility>
#include <vector>
#include<cstdlib>
#include "Log.hpp"
#include "Lockguard.hpp"
#ifdef __linux__
#include <sys/mman.h>
#else
#include<windows.h>
#endif
using std::cout;
using std::endl;
using namespace LogModule;
using namespace LockGuardModule;
//页大小4096字节
constexpr size_t kpagesize = 1 << 12; //4kb
//系统调用函数开辟空间
inline static void* SystemAlloc(size_t kpage)
{
void* ptr = nullptr;
//判断系统
#ifdef _WIN32
ptr = VirtualAlloc(nullptr, kpagesize*kpage, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
if(ptr == NULL)
{
Log(LogModule::ERROR) << "alloc fail";
throw std::bad_alloc();
}
#else
//起始地址null则系统分配,空间大小,内存保护权限(可读可写),标志位(私有写时复制|不关联文件),关联文件,文件偏移量
ptr = mmap(nullptr, kpagesize*kpage, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if(ptr == MAP_FAILED)
{
ptr = nullptr;
Log(LogModule::ERROR) << "alloc fail";
throw std::bad_alloc();
}
#endif
return ptr;
}
//系统调用释放资源
inline static void SystemFree(void* ptr, size_t kpage)
{
if(ptr == nullptr)
return;
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
munmap(ptr, kpage*kpagesize);
#endif
}
template<class T>
class ObjectPool
{
private:
//扩容内存块
bool ExpandPool(size_t kblock)
{
while(kblock--)
{
try
{
void* ptr = SystemAlloc(_blockpages);
_memoryblocks.push_back(ptr);
}
catch(const std::bad_alloc&)
{
throw std::bad_alloc();
}
}
return true;
}
//计算内存对齐需要的总空间
size_t CalculateTotal() const
{
uintptr_t cur = reinterpret_cast<uintptr_t>(_currentblock);
//补全量
size_t padding = (_alignment - (cur % _alignment)) % _alignment;
return padding + sizeof(T);
}
public:
ObjectPool()
{
pthread_mutex_init(&_mutex, nullptr);
//对齐数
_alignment = alignof(T);
//大小
_original_size = sizeof(T);
//对齐大小
_alignsize = (_original_size + _alignment - 1) & (~(_alignment - 1));
ExpandPool(1);
_index = 0;
_leftsize = kpagesize * _blockpages;
_currentblock = reinterpret_cast<char*>(_memoryblocks[_index]);
_freelist = nullptr;
}
//可变参数完美转发,开辟空间
template<class... Args>
T* New(Args&&... args)
{
T* obj = nullptr;
//复用之前释放的空间
if(_freelist)
{
//加锁
Lockguard lock(_mutex);
if(_freelist)
{
//头删操作 reinterpret_cast内存强制转换
obj = reinterpret_cast<T*>(_freelist);
_freelist = *reinterpret_cast<void**>(_freelist);
}
}
if(obj == nullptr)
{
//当前内存块需要分配的总空间
size_t total = CalculateTotal();
//空间不足开辟新块
if(_leftsize < total)
{
//加锁
Lockguard lock(_mutex);
if(_leftsize < total)
{
_index++;
if(_index >= _memoryblocks.size())
ExpandPool(1);
//重置当前块
_currentblock = reinterpret_cast<char*>(_memoryblocks[_index]);
_leftsize = kpagesize * _blockpages;
}
}
Lockguard lock(_mutex);
//重新计算需要的空间大小
total = CalculateTotal();
//内存对齐分配空间
size_t padding = total - sizeof(T);
obj = reinterpret_cast<T*>(_currentblock + padding);
//调整大小和当前指针位置
_currentblock += total;
_leftsize -= total;
}
//定位new,完美转发接入参数
new(obj) T(std::forward<Args>(args)...);
return obj;
}
//删除操作
void Delete(T* t)
{
if(!t)
return;
//显示调用析构函数
t->~T();
//头插当前释放的空间进入自由链表
Lockguard lock(_mutex);
*reinterpret_cast<void**>(t) = _freelist;
_freelist = t;
}
~ObjectPool()
{
//释放所有空间
for(void* ptr : _memoryblocks)
{
SystemFree(ptr, _blockpages);
}
}
private:
std::vector<void*> _memoryblocks;
char* _currentblock;
size_t _leftsize;
size_t _index;
void* _freelist;
size_t _original_size;
size_t _alignment;
size_t _alignsize;
pthread_mutex_t _mutex;
static constexpr size_t _blockpages = 16;
};
LockGuard.hpp的实现
#ifndef __LOCKGUARD_HPP__
#define __LOCKGUARD_HPP__
#include<pthread.h>
namespace LockGuardModule
{
class Lockguard
{
public:
Lockguard(pthread_mutex_t& mutex)
:_mutex(mutex)
{
pthread_mutex_lock(&_mutex);
}
~Lockguard()
{
pthread_mutex_unlock(&_mutex);
}
private:
pthread_mutex_t& _mutex;
};
}
#endif