📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
🏳️🌈一、thread cache 回收策略
- 随着我们对 thread cache 的使用,会有越来越多的内存块从 central cache 中被申请,然后等使用完后再返还给 thread cahce,挂在对应的自由链表中,继续等待使用
- 但是倘若我们有一个自由链表后面挂着的内存块十分地长,长到远远超过当前进程所需,那么就会造成内存地占用,导致浪费,使有的地方的内存不够使用
- 所以我们需要指定一个 thread cache 回收策略,针对太长的自由链表进行回收,返还给 central cache,使内存池更加高效
// 释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size) {
assert(ptr);
assert(size <= MAX_BYTES);
// 找出对应映射的链表桶,并将对象放入链表中
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
// 当链表过长时,回收内存到中心缓存
if (_freeLists[index].MaxSize() >= _freeLists[index].MaxSize()) {
ListTooLong(_freeLists[index], size);
}
}
当自由链表的长度大于一次批量申请的对象时,我们具体的做法就是,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。
// 释放对象时,链表过⻓时,回收内存回到中⼼缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size) {
void* start = nullptr;
void* end = nullptr;
// 弹出一段内存对象
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
对于 ListTooLong 部分的代码,我们可以先确认 头尾地址,将这段内存对象从链表中弹出,再通过 central cache 返还给 page cache 处理
因此我们也就需要为 list 对象新添一个弹出指定大小内存的方法 PopRange
void PopRange(void*& start, void*& end, size_t maxSize, size_t n) {
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i) {
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
这部分的方法思路就是
- 将 链表头节点作为 返回值的头地址
- 然后再 利用循环,不断向后扩大 这个返回内存条的范围
- 最后再 将剩余内存条的头节点 作为 链表新头节点
- 将 链表的大小剪去弹出的大小
// 管理切分好的小对象的自由链表
class FreeList {
public:
void Push(void* obj) {
assert(obj);
// 头插
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
// 将新分配的内存块放入链表中
void PushRange(void* start, void* end, size_t n) {
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n) {
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i) {
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop() {
assert(_freeList);
// 头删
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty() {
return (_freeList == nullptr);
}
size_t& MaxSize() {
return _maxSize;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;
};
🏳️🌈二、central cache 回收策略
当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。
注意:还给central cache的这些对象不一定都是属于同一个span的。
central cache
中的每个哈希桶当中可能都不止一个 span
,因此当我们计算出还回来的对象应该还给 central cache
的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。
如何找到一个对象对应的span?
虽然我们现在可以通过对象的地址得到其所在的页号,但是我们还是不能知道这个对象到底属于哪一个span。因为一个span管理的可能是多个页。
为了解决这个问题,我们可以建立页号和span之间的映射 。由于这个映射关系在 page cache 进行 span 的合并时也需要用到,因此我们直接将其存放到 page cache 里面。这时我们就需要在 PageCache 类当中添加一个映射关系了,这里可以用 unordered_map 进行实现,并且添加一个函数接口,用于让 central cache 获取这里的映射关系。
// 单例模式
class PageCache{
public:
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
private:
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
因此,我们需要在 每当page cache分配span给central cache时,记录一下页号和span之间的映射关系。此后当thread cache还对象给central cache时,才知道应该具体还给哪一个span
分别应该在
- page cache 在返回这个 k 页的 span 给 central cache 时
- 当 central cache 在调用 NewSpan 接口向 page cache 申请 k 页的 span 时
// 获取一个 k 页的 span
Span* PageCache::NewSpan(size_t k) {
assert(k > 0 && k < NPAGES);
// 先检查第 k 个桶里面有没有 span
if (!_spanLists[k].Empty()) {
Span* kSpan = _spanLists->PopFront();
// 建立 页号 与 span 的映射关系,方便 central cache 回收小块内存查找对应的 span
for (PAGE_ID i = 0; i < kSpan->_n; ++i) {
_idSpanMap[kSpan->_PAGEID + i] = kSpan;
}
return kSpan;
}
// 检查一下后面的桶里面有没有 span,如果有可以把他进行切分
for (size_t i = k + 1; i < NPAGES; ++i) {
// 如果后面的桶里面有 span,这个 span 是肯定大于 k 页的
if (!_spanLists[i].Empty()) {
// 弹出第 i 个桶的第一个 span
Span* nSpan = _spanLists[i].PopFront();
// 进行切分,切分成一个 k 页的 span 和一个 i-k 页的 span
Span* kSpan = new Span;
kSpan->_PAGEID = nSpan->_PAGEID;
kSpan->_n = k;
nSpan->_PAGEID += k;
nSpan->_n -= k;
// 将剩余的代码挂到对应的映射位置上
_spanLists[nSpan->_n].PushFront(nSpan);
for (PAGE_ID i = 0; i < kSpan->_n; ++i) {
_idSpanMap[kSpan->_PAGEID + i] = kSpan;
}
return kSpan;
}
}
// 走到这个位置就说明后面没有大页的 span 了
// 就需要去找堆要一个 128 页的 span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_PAGEID = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
此时我们就可以通过对象的地址找到该对象对应的span了,直接将该对象的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的span即可。
Span* PageCache::MapObjectToSpan(void* obj) {
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto ret = _idSpanMap.find(id);
if(ret != _idSpanMap.end())
return ret->second;
else {
assert(false);
return nullptr;
}
}
当我们要通过某个页号查找其对应的span时,该页号与其span之间的映射一定是建立过的,如果此时我们没有在unordered_map当中找到,则说明我们之前的代码逻辑有问题 ,因此当没有找到对应的span时可以直接用断言结束程序,以表明程序逻辑出错。
// 将⼀定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t size) {
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start) {
void* next = NextObj(start); // 记录下一个节点
// 获取从对象到 span 的映射
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
// 将对象头插到 span 的自由链表中
NextObj(start) = span->_freeList;
span->_freeList = start;
// 更新被分配给 thread cache的对象数量
span->_userCount--;
// 当这个span分配出去的对象数量为0时,说明这个span已经没有被分配出去的对象了,
if (span->_userCount == 0) {
// 此时这个span就可以再回收给 page cache 了
_spanLists[index].Erase(span);
span->_freeList = nullptr; // 自由链表制空
span->_next = nullptr;
span->_prev = nullptr;
// 释放 span 给 page cache 时,使用 page cache 的锁即可,这是把 central cache桶锁解掉
_spanLists[index]._mtx.unlock(); // 解除桶锁
PageCache::GetInstance()->_pageMtx.lock(); // 加锁
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock(); // 解锁
_spanLists[index]._mtx.lock(); // 加桶锁
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
在central cache
还span给 page cache
时也存在锁的问题,此时需要>先将 central cache
中对应的桶锁解掉,然后再加上 page cache
的大锁之后才能进入 page cache
进行相关操作,当处理完毕回到 central cache
时,除了将 page cache
的大锁解掉,还需要立刻加上 central cache
对应的桶锁,然后将还未还完对象继续还给 central cache
中对应的span。
🏳️🌈三、page cahce 回收策略
如果 central cache 中有某个 span 的 _useCount 减到0了,那么 central cache 就需要将这个 span 还给 page cache 了。
这个过程看似是非常简单的,page cache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并。
page cache进行前后页的合并分析
合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。
- 在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。
- 在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。
当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span当中的对象正在被其他线程使用。
我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时,默认该span是没有被使用的。
// 管理多个连续页大块内存跨度结构
struct Span {
PAGE_ID _PAGEID = 0; // 大块内存起始页的页号
size_t _n = 0; // 页的数量
Span* _next = nullptr; // 双向链表的结构
Span* _prev = nullptr;
size_t _userCount = 0; // 已分配给 Thread Cache 的小块内存数量
void* _freeList = nullptr; // 自由链表头指针(指向未分配的小块内存)
bool _isused = false; // 是否在被使用
};
因为当一个span在尝试进行合并时,
- 如果是往前合并,那么只需要通过一个span的尾页找到这个span,
- 如果是向后合并,那么只需要通过一个span的首页找到这个span。
也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。
void PageCache::ReleaseSpanToPageCache(Span* span) {
// 对 span 前后的页,尝试进行合并,缓解内存碎片问题
// 向前合并
while (1) {
PAGE_ID prevId = span->_PAGEID - 1;
auto ret = _idSpanMap.find(prevId);
// 前面的页号没有,不合并了
if (ret == _idSpanMap.end()) {
break;
}
// 前面相邻页的span在使用,不合并了
Span* prevSpan = ret->second;
if (ret->second->_isused == true) {
break;
}
// 合并出超过 128 页的 span 没办法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES) {
break;
}
span->_PAGEID = prevSpan->_PAGEID;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
// 向后合并
while (1) {
PAGE_ID nextId = span->_PAGEID + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end()) {
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isused == true) {
break;
}
if(nextSpan->_n + span->_n >> NPAGES - 1){
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_n].PushFront(span);
span->_isused = false;
_idSpanMap[span->_PAGEID] = span;
_idSpanMap[span->_PAGEID + span->_n - 1] = span;
}
👥总结
本篇博文对 【从零实现高并发内存池】thread cache、central cache 和 page cache 回收策略详解 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~