【从零实现高并发内存池】thread cache、central cache 和 page cache 回收策略详解

发布于:2025-04-18 ⋅ 阅读:(27) ⋅ 点赞:(0)

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述


🏳️‍🌈一、thread cache 回收策略

  1. 随着我们对 thread cache 的使用,会有越来越多的内存块从 central cache 中被申请,然后等使用完后再返还给 thread cahce,挂在对应的自由链表中,继续等待使用
  2. 但是倘若我们有一个自由链表后面挂着的内存块十分地长,长到远远超过当前进程所需,那么就会造成内存地占用,导致浪费,使有的地方的内存不够使用
  3. 所以我们需要指定一个 thread cache 回收策略,针对太长的自由链表进行回收,返还给 central cache,使内存池更加高效
// 释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size) {
	assert(ptr);
	assert(size <= MAX_BYTES);

	// 找出对应映射的链表桶,并将对象放入链表中
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	// 当链表过长时,回收内存到中心缓存
	if (_freeLists[index].MaxSize() >= _freeLists[index].MaxSize()) {
		ListTooLong(_freeLists[index], size);
	}
}

当自由链表的长度大于一次批量申请的对象时,我们具体的做法就是,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。

// 释放对象时,链表过⻓时,回收内存回到中⼼缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size) {
	void* start = nullptr;
	void* end = nullptr;
	// 弹出一段内存对象
	list.PopRange(start, end, list.MaxSize());

	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

对于 ListTooLong 部分的代码,我们可以先确认 头尾地址,将这段内存对象从链表中弹出再通过 central cache 返还给 page cache 处理

因此我们也就需要为 list 对象新添一个弹出指定大小内存的方法 PopRange

void PopRange(void*& start, void*& end, size_t maxSize, size_t n) {
	assert(n >= _size);
	start = _freeList;
	end = start;

	for (size_t i = 0; i < n - 1; ++i) {
		end = NextObj(end);
	}

	_freeList = NextObj(end);
	NextObj(end) = nullptr;
	_size -= n;
}

这部分的方法思路就是

  1. 链表头节点作为 返回值的头地址
  2. 然后再 利用循环,不断向后扩大 这个返回内存条的范围
  3. 最后再 将剩余内存条的头节点 作为 链表新头节点
  4. 链表的大小剪去弹出的大小
// 管理切分好的小对象的自由链表
class FreeList {
	public:
		void Push(void* obj) {
			assert(obj);
			// 头插
			NextObj(obj) = _freeList;
			_freeList = obj;

			++_size;
		}

		// 将新分配的内存块放入链表中
		void PushRange(void* start, void* end, size_t n) {
			NextObj(end) = _freeList;
			_freeList = start;

			_size += n;
		}

		void PopRange(void*& start, void*& end, size_t n) {
			assert(n >= _size);
			start = _freeList;
			end = start;

			for (size_t i = 0; i < n - 1; ++i) {
				end = NextObj(end);
			}

			_freeList = NextObj(end);
			NextObj(end) = nullptr;
			_size -= n;
		}

		void* Pop() {
			assert(_freeList);

			// 头删
			void* obj = _freeList;
			_freeList = NextObj(obj);
			--_size;
			
			return obj;
		}

		bool Empty() {
			return (_freeList == nullptr);
		}

		size_t& MaxSize() {
			return _maxSize;
		}

	private:
		void* _freeList = nullptr;
		size_t _maxSize = 1;

		size_t _size = 0;
};

🏳️‍🌈二、central cache 回收策略

当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。

注意:还给central cache的这些对象不一定都是属于同一个span的。
central cache 中的每个哈希桶当中可能都不止一个 span,因此当我们计算出还回来的对象应该还给 central cache 的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。

如何找到一个对象对应的span?

虽然我们现在可以通过对象的地址得到其所在的页号,但是我们还是不能知道这个对象到底属于哪一个span。因为一个span管理的可能是多个页。

为了解决这个问题,我们可以建立页号和span之间的映射 。由于这个映射关系在 page cache 进行 span 的合并时也需要用到,因此我们直接将其存放到 page cache 里面。这时我们就需要在 PageCache 类当中添加一个映射关系了,这里可以用 unordered_map 进行实现,并且添加一个函数接口,用于让 central cache 获取这里的映射关系。

// 单例模式
class PageCache{
public:
	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);
private:
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};

因此,我们需要在 每当page cache分配span给central cache时,记录一下页号和span之间的映射关系此后当thread cache还对象给central cache时,才知道应该具体还给哪一个span

分别应该在

  1. page cache 在返回这个 k 页的 span 给 central cache 时
  2. 当 central cache 在调用 NewSpan 接口向 page cache 申请 k 页的 span 时
// 获取一个 k 页的 span
Span* PageCache::NewSpan(size_t k) {
	assert(k > 0 && k < NPAGES);

	// 先检查第 k 个桶里面有没有 span
	if (!_spanLists[k].Empty()) {
		Span* kSpan = _spanLists->PopFront();

		// 建立 页号 与 span 的映射关系,方便 central cache 回收小块内存查找对应的 span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i) {
			_idSpanMap[kSpan->_PAGEID + i] = kSpan;
		}
		return kSpan;
	}

	// 检查一下后面的桶里面有没有 span,如果有可以把他进行切分
	for (size_t i = k + 1; i < NPAGES; ++i) {
		// 如果后面的桶里面有 span,这个 span 是肯定大于 k 页的
		if (!_spanLists[i].Empty()) {
			// 弹出第 i 个桶的第一个 span
			Span* nSpan = _spanLists[i].PopFront();
			
			// 进行切分,切分成一个 k 页的 span 和一个 i-k 页的 span
			Span* kSpan = new Span;
			kSpan->_PAGEID = nSpan->_PAGEID;
			kSpan->_n = k;

			nSpan->_PAGEID += k;
			nSpan->_n -= k;

			// 将剩余的代码挂到对应的映射位置上
			_spanLists[nSpan->_n].PushFront(nSpan);
			for (PAGE_ID i = 0; i < kSpan->_n; ++i) {
				_idSpanMap[kSpan->_PAGEID + i] = kSpan;
			}

			return kSpan;
		}
	}
	// 走到这个位置就说明后面没有大页的 span 了
		// 就需要去找堆要一个 128 页的 span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);

	bigSpan->_PAGEID = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

此时我们就可以通过对象的地址找到该对象对应的span了,直接将该对象的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的span即可。

Span* PageCache::MapObjectToSpan(void* obj) {
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
	auto ret = _idSpanMap.find(id);
	if(ret != _idSpanMap.end())
		return ret->second;
	else {
		assert(false);
		return nullptr;
	}
}

当我们要通过某个页号查找其对应的span时,该页号与其span之间的映射一定是建立过的,如果此时我们没有在unordered_map当中找到,则说明我们之前的代码逻辑有问题 ,因此当没有找到对应的span时可以直接用断言结束程序,以表明程序逻辑出错。

// 将⼀定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t size) {
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();

	while (start) {
		 void* next = NextObj(start); // 记录下一个节点

		 // 获取从对象到 span 的映射
		 Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		 // 将对象头插到 span 的自由链表中
		 NextObj(start) = span->_freeList;
		 span->_freeList = start;

		 // 更新被分配给 thread cache的对象数量
		 span->_userCount--;

		 // 当这个span分配出去的对象数量为0时,说明这个span已经没有被分配出去的对象了,
		 if (span->_userCount == 0) {
			 // 此时这个span就可以再回收给 page cache 了
			 _spanLists[index].Erase(span);
			 span->_freeList = nullptr; // 自由链表制空
			 span->_next = nullptr;
			 span->_prev = nullptr;

			 // 释放 span 给 page cache 时,使用 page cache 的锁即可,这是把 central cache桶锁解掉
			 _spanLists[index]._mtx.unlock(); // 解除桶锁
			 PageCache::GetInstance()->_pageMtx.lock(); // 加锁
			 PageCache::GetInstance()->ReleaseSpanToPageCache(span); 
			 PageCache::GetInstance()->_pageMtx.unlock(); // 解锁
			 _spanLists[index]._mtx.lock(); // 加桶锁
		 }
		 start = next;
	}

	_spanLists[index]._mtx.unlock();
}

central cache 还span给 page cache 时也存在锁的问题,此时需要>先将 central cache 中对应的桶锁解掉,然后再加上 page cache 的大锁之后才能进入 page cache 进行相关操作当处理完毕回到 central cache 时,除了将 page cache 的大锁解掉,还需要立刻加上 central cache 对应的桶锁,然后将还未还完对象继续还给 central cache 中对应的span。

🏳️‍🌈三、page cahce 回收策略

  1. 如果 central cache 中有某个 span 的 _useCount 减到0了,那么 central cache 就需要将这个 span 还给 page cache 了。

  2. 这个过程看似是非常简单的,page cache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并。

page cache进行前后页的合并分析

合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。

  • 在向前合并时就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。
  • 在向后合并时就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。

当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span当中的对象正在被其他线程使用。

我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时,默认该span是没有被使用的。

// 管理多个连续页大块内存跨度结构
struct Span {
	PAGE_ID _PAGEID = 0;	// 大块内存起始页的页号
	size_t _n = 0;			// 页的数量

	Span* _next = nullptr;	// 双向链表的结构
	Span* _prev = nullptr; 

	size_t _userCount = 0;	// 已分配给 Thread Cache 的小块内存数量
	void* _freeList = nullptr; // 自由链表头指针(指向未分配的小块内存)

	bool _isused = false;	// 是否在被使用
};

因为当一个span在尝试进行合并时,

  • 如果是往前合并,那么只需要通过一个span的尾页找到这个span,
  • 如果是向后合并,那么只需要通过一个span的首页找到这个span。

也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。

void PageCache::ReleaseSpanToPageCache(Span* span) {
	// 对 span 前后的页,尝试进行合并,缓解内存碎片问题
	
	// 向前合并
	while (1) {
		PAGE_ID prevId = span->_PAGEID - 1;
		auto ret = _idSpanMap.find(prevId);
		// 前面的页号没有,不合并了
		if (ret == _idSpanMap.end()) {
			break;
		}
		// 前面相邻页的span在使用,不合并了
		Span* prevSpan = ret->second;
		if (ret->second->_isused == true) {
			break;
		}
		// 合并出超过 128 页的 span 没办法管理,不合并了
		if (prevSpan->_n + span->_n > NPAGES) {
			break;
		}

		span->_PAGEID = prevSpan->_PAGEID;
		span->_n += prevSpan->_n;

		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}

	// 向后合并
	while (1) {
		PAGE_ID nextId = span->_PAGEID + span->_n; 
		auto ret = _idSpanMap.find(nextId);
		if (ret == _idSpanMap.end()) {
			break;
		}

		Span* nextSpan = ret->second;
		if (nextSpan->_isused == true) {
			break;
		}
		if(nextSpan->_n + span->_n >> NPAGES - 1){
			break;
		}

		span->_n += nextSpan->_n;
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}

	_spanLists[span->_n].PushFront(span);
	span->_isused = false;
	_idSpanMap[span->_PAGEID] = span;
	_idSpanMap[span->_PAGEID + span->_n - 1] = span;
}

👥总结

本篇博文对 【从零实现高并发内存池】thread cache、central cache 和 page cache 回收策略详解 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~


网站公告

今日签到

点亮在社区的每一天
去签到