【从零实现高并发内存池】内存池整体框架设计 及 thread cache实现

发布于:2025-04-15 ⋅ 阅读:(24) ⋅ 点赞:(0)

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述


🏳️‍🌈一、高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么,我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹, 所以这次我们实现的内存池需要考虑以下几方面的问题

  1. 性能问题
  2. 多线程环境下,锁竞争问题
  3. 内存碎片问题

并发内存池(concurrent memory pool) 主要由以下3个部分构成:

  • thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
  • central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象central cache 合适的时机回收 thread cache 中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的 ,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈
  • page cache页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

在这里插入图片描述

🏳️‍🌈二、高并发内存池 - thread cache

2.1 思想原理

上一篇文章中,笔者 模拟了定长内存池,验证了其在大体量内存开辟时,速度大于malloc的特性

但是问题也来了,上一篇的是定长内存池,每一块的内存大小是一样的,所以就导致了 如果我们要申请释放的是不同大小的内存块就会非常麻烦,所以我们可以引入一个 哈希桶 的策略

thread cache 就是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表 。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

在这里插入图片描述
将不同大小的内存请求,通过 分级对齐分组策略,映射到对应的自由链表桶中,实现高效内存管理。关键在于:

  • 减少碎片 :同组内对象大小相近,提高内存利用率。
  • 快速定位 :通过索引直接找到对应链表,O(1) 时间复杂度。
    在这里插入图片描述

2.2 自由链表

因为哈希桶背后还是由一个个定长内存池所以我们仍需要给自由链表定义结构体

这里我们先简单实现一下 pushpopempty 功能,后面的功能等有用到的时候再实现

上篇文章也说了,这里 利用每个节点前面几位,来记录下一个节点的地址

// 管理切分好的小对象的自由链表
class FreeList {
	public:
		void Push(void* obj) {
			// 头插
			NextObj(obj) = _freeList;
			_freeList = obj;
		}

		void* Pop() {
			assert(_freeList);

			// 头删
			void* obj = _freeList;
			_freeList = NextObj(obj);
			
			return obj;
		}

		bool Empty() {
			return (_freeList == nullptr);
		}

	private:
		void* _freeList;
};

2.3 哈希桶

先理解一下哈希桶的映射规则:
根据对象大小,将内存请求分为不同区间,并采用不同的对齐粒度

在这里插入图片描述

首先MAX_BYTES 被定义为256 * 1024,也就是256KB,这是内存池能处理的最大对象大小

接下来NFREELISTS,即哈希桶的数量,我们设为208,也就是每个桶管理特定大小范围的内存块。

再接着group_array 数组,定义为{16, 56, 56, 56}。这代表每个区间有多少个链。

然后RoundUp 函数,它根据请求的大小选择不同的对齐方式。例如,小于等于128字节的对齐到8字节,129到1024的对齐到16字节,依此类推。对齐后的尺寸通过_RoundUp函数计算,这里使用了位运算来高效地进行向上取整。

接下来Index 函数,它根据对齐后的尺寸确定对应的自由链表桶索引。这里的分段逻辑比较复杂,涉及到group_array的使用。例如,当请求的字节数超过某个阈值时,会减去之前区间的最大值,然后调用_Index函数计算相对索引,再加上前面区间的桶数总和。

2.3.1 RoundUp 函数

这部分我们计算对齐后的内存大小,可以对请求的内存大小进行判断,位于哪个区间就对齐到哪

// 计算对齐后的内存大小
static inline size_t RoundUp(size_t size) {
	if (size <= 128) {
		return _RoundUp(size, 8); // 对齐到 8 字节
	}
	else if (size <= 1024) {
		return _RoundUp(size, 16); // 对齐到 16 字节
	}
	else if (size <= 8 * 1024) {
		return _RoundUp(size, 128); // 对齐到 128 字节
	}
	else if (size <= 64 * 1024) {
		return _RoundUp(size, 1024);
	}
	else if (size <= 256 * 1024) {
		return _RoundUp(size, 8 * 1024);
	}
	else {
		assert(false);
		return -1;
	}
}

一般来说,我们所能想到的 _RoundUp 的处理方法是 判断是否达到对齐数,不够填,够了直接返回,也就是

		// ​小对象​(1~128字节):对齐到 8 字节,分配到自由列表的前 16 个槽位(索引 0~15)。
		//​ 中等对象​(129~1024字节):对齐到 16 字节,分配到索引 16~71。
		// 大对象​(1025~81024字节):对齐到 128 字节,分配到索引 72~127。
		// 超大对象:对齐到更高粒度(如 1024、8192 字节),避免大对象占用过多小内存块。
		
		// 整体控制在最多10%左右的内碎?浪费
			// [1,128]					8byte对?			freelist[0,16)
			// [128+1,1024]			16byte对?		freelist[16,72)
			// [1024+1,81024]			128byte对?		freelist[72,128)
			// [8*1024+1,641024]		1024byte对?		freelist[128,184)
			// [64*1024+1,256*1024]	8*1024byte对?	freelist[184,208)
		
		// version - 1 普通人版
		// 将请求的内存大小调整为对齐后的整数倍。
			// size 当前请求的内存大小
			// alignNum 对齐数
			// alignSize 对齐后的内存大小
		size_t _RoundUp(size_t size, size_t alignNum) {
			size_t alignSize ;
			if (size % alignNum != 0) {
				alignSize = (size / alignNum + 1) * alignNum;
			}
			else {
				alignSize = size;
			}
			return alignSize;
		}

但是大佬的话,就会有更高级的思路,也就是 位运算

		// version - 2 进阶版
		// 通过位运算来实现对齐计算,避免了除法和乘法的开销。
			// size 当前请求的内存大小
			// alignNum 对齐数
			// alignSize 对齐后的内存大小
		static inline size_t _RoundUp(size_t size, size_t alignNum) {
			return ((size + alignNum - 1) & ~(alignNum - 1));
		}

2.3.2 Index 函数

这部分的作用就是 根据对齐后的尺寸,计算对应的自由链表桶索引

在这里插入图片描述
根据这个表,我们可以得出 每个对象大小范围,各个桶的索引范围,也就得出了各个范围的桶的数量

接下来就只要根据这个范围划分,分门别类就行了

static inline size_t _Index(size_t bytes, size_t align_shift)
{
	return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}

// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
	assert(bytes <= MAX_BYTES);

	// 每个区间有多少个链
	static int group_array[4] = { 16, 56, 56, 56 };
	if (bytes <= 128) {
		return _Index(bytes, 3);
	}
	else if (bytes <= 1024) {
		return _Index(bytes - 128, 4) + group_array[0];
	}
	else if (bytes <= 8 * 1024) {
		return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
	}
	else if (bytes <= 64 * 1024) {
		return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
	}
	else if (bytes <= 256 * 1024) {
		return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
	}
	else {
		assert(false);
	}

	return -1;
}

2.4 Thread create 类

先来看看基本的 内存分配流程

graph TD
    A[分配请求] --> B{本地链表是否为空?}
    B -->|| C[从链表弹出对象]
    B -->|| D[向中心缓存批量申请]
    D --> E[填充本地链表]
    E --> C

因为我们实现的是 thread create 类,所以除了要兼顾 定长内存池和哈希桶链表,还需要有能向中心缓存中获取内存空间的方法

2.4.1 void* Allocate(size_t size);

Allocate()函数的作用是申请内存对象,该函数主要分为以下两种情况:

  1. 通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回
  2. 如果此时自由链表为空,那么我们就需要从central cache进行获取了
void* ThreadCache::Allocate(size_t size) {
	assert(size <= MAX_BYTES);

	// 获取对齐后的内存大小
	size_t alignSize = SizeClass::RoundUp(size);
	// 获取对应的自由链表索引
	size_t index = SizeClass::Index(size);
	// 如果此时该索引的自由链表不为空,则直接从自由链表中获取对象
	if (!_freeLists[index].Empty()) {
		// 有可用对象,直接从自由链表中获取
		return _freeLists[index].Pop();
	}
	else {
		// 从中心缓存获取对象
		return FetchFromCentralCache(index, alignSize);
	}
}

2.4.2 void ThreadCache::Deallocate(void* ptr, size_t size)

通过头插法实现O(1)时间复杂度回收

void ThreadCache::Deallocate(void* ptr, size_t size) {
	assert(ptr);
	assert(size <= MAX_BYTES);

	// 找出对应映射的链表桶,并将对象插入自由链表中
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);
}

2.4.3 void* FetchFromCentralCache(size_t index, size_t alignSize);

这部分是去中心缓存中获取对象,先欠着,后面在将中心缓存的时候会补

🏳️‍🌈三、无锁访问

每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?

我们不能将这个 thread cache 创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。

要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。

// static 保证只在当前文件可见
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

因此我们可以再封装一层申请内存与释放内存的函数,如下

static void* ConcurrentAlloc(size_t size)
{
	// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
	if (pTLSThreadCache == nullptr)
	{
		pTLSThreadCache = new ThreadCache;
	}

	cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

	return pTLSThreadCache->Allocate(size);
}

static void ConcurrentFree(void* ptr, size_t size)
{
	assert(pTLSThreadCache);

	pTLSThreadCache->Deallocate(ptr, size);
}

注意不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此做是否为空的判断。

我们利用下面的测试代码试试

#define _CRT_SECURE_NO_WARNINGS 1

#include "ObjectPool.hpp"
#include "ConcurrentAlloc.hpp"

void Alloc1()
{
	for (size_t i = 0; i < 5; ++i)
	{
		void* ptr = ConcurrentAlloc(6);
	}
}

void Alloc2()
{
	for (size_t i = 0; i < 5; ++i)
	{
		void* ptr = ConcurrentAlloc(7);
	}
}


void TLSTest()
{
	std::thread t1(Alloc1);
	t1.join();

	std::thread t2(Alloc2);
	t2.join();
}

int main()
{
	//TestObjectPool();
	TLSTest();

	return 0;
}

在这里插入图片描述


👥总结

本篇博文对 【从零实现高并发内存池】内存池整体框架设计 及 thread cache实现 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述