✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】【Linux网络编程】【项目详解】
目录
1、pagecache
1.1、整体设计
page cache 与 central cache结构的相同之处
central cache的结构与thread cache 是相同的,都是哈希桶结构,并且page cache的每个哈希桶中挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。
page cache 与 central cache结构的不同之处
1、page cache与 central cache的映射规则不同(包括thread cache)。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,一次类推。
2、central cache每个桶中的span被切成一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span由central cache自己来决定。
page cache当中究竟有多少个桶,这就要看你最大想挂多大页的span了,我们这里就最大挂128页的span,为了让桶号与页号对应起来,我们可以将第0号桶空出来,因此我们需要将哈希桶的个数设置成129。
// page cache中哈希桶的个数
static const size_t NPAGES = 129;
为什么这里最大挂128页的span呢?
因为线程申请单个对象最大是256KB,而128页可以被切成4个256KB的对象,因此是足够的。当然,如果你想在page cache中挂更大的span也是可以的,根据具体的需求进行设置即可。
在page cache中获取一个n页的span
1、如果central cache要获取一个n页的span,那我们就可以在page cache的第n号桶中取出一个span返回给central cache即可。
2、如果第n号桶中没有span,这时我们并不是直接向堆申请一个n页的span,而是继续在后面的桶中寻找span。
- 当第n号桶中没有span时,我们可以找第n+1号桶,因为我们可以将第n+1页的span分成一个n页的span和一个1页的span,这时我们就可以将n页的span返回,而将切分后的1页挂接到1号桶中。
- 当后面的桶当中都没有span,这时我们就只能向堆申请一个128页的内存块,并将其用一个span结构管理起来,然后将128页的span切分成n页的span和128-n页的span,其中n页的span返回给central cache,而128-n页的span挂到第128-n号桶中。
- 注意:当我们直接向堆申请以页为单位的内存时,我们应该尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时我们可以将其切小后分配给线程,当线程将内存释放后我们又可以将其合并成大块连续的内存。
实际上,我们每次向堆申请的都是128页大小的内存块,central cache要的这些span实际都是由128页的span切分出来的。
1.2、核心实现
1、当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶可能同时向page cache申请内存,索引page cache也是存在线程安全问题的,因此在访问page cache时也必须要加锁。
- 但是page cache这里不能使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。
- 换句话说,在访问page cache时,我们可能需要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此我们在访问page cache时没有使用桶锁,而是用一把大锁将整个page cache锁住。
2、page cache在整个进程中只能存在一个,因此我们也需要将其设置为单例模式。
// 单例模式
class PageCache
{
public:
// 提供一个全局访问点
static PageCache* GetInstance()
{
return &_sInst;
}
std::mutex _pageMtx; // 大锁,设置公有方便调用
private:
SpanList _spanLists[NPAGES];
// C++98,构造函数私有
PageCache()
{}
// C++11,禁止拷贝
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
当程序运行起来后我们就立马创建该对象即可。
// 类外初始化静态成员
PageCache PageCache::_sInst;
1.3、获取Span
1.3.1、获取一个非空的Span
thread cache向central cache申请对象时,central cache需要先从对应的哈希桶中获取到一个非空的Span,然后从这个非空的Span中取出指定大小的对象返回给thread cache。那central cache到底是如何从对应的哈希桶中获取到一个非空的Span的呢?
1、遍历central cache对应哈希桶中的双链表,如果该双链表中有非空的Span,那么直接将Span返回即可。为了方便遍历这个双链表,我们可以模拟迭代器的方式,给SpanList类提供Begin和End成员函数,分别获取双链表的第一个Span结点和最后一个Span结点的下一个结点,也就是头结点。
// 管理Span的链表结构
class SpanList
{
public:
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
private:
Span* _head; // 双向链表头结点指针
public:
std::mutex _mtx; // 桶锁,方便类外访问
};
2、但如果遍历双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请内存块了。
那具体是向page cache申请多大的内存块呢?
我们可以根据具体所需对象的大小来决定,就像之前我们根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,现在我们是根据对象的大小计算出,central cache一次应该向page cache申请几页的内存块。
我们可以先根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。也就是说,central cache向page cache申请内存时,要求申请到的内存尽量能够满足thread cache向central cache申请时的上限。
class SizeClass
{
public:
// central cache一次向page cache获取多少页
static size_t NumMovePage(size_t size)
{
// 计算出thread cache一次向central cache申请对象的个数上限
size_t num = NumMoveSize(size);
// num 个size 大小的对象所需的字节数
size_t nPage = num * size;
nPage >>= PAGE_SHIFT; // 将字节数转换为页数
if (nPage == 0)
nPage = 1; // 至少给一页
return nPage;
}
};
- PAGE_SHIFT代表页大小转换偏移,我们这里以页的大小为8K为例,PAGE_SHIFT的值就是13。
// 页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;
- 注意:当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。
如何找到一个span所管理的内存块呢?
首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。
明确了这块内存的起始和结束位置后,我们就可以进行切分了。根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到span的自由链表中即可。
为什么是尾插呢?
因为我们如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式结构链接起来的,而实际它们在物理上是连续的,这时当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 1.先在list中寻找非空的Span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
// 2.list中没有非空的Span,只能向page cache申请
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
// 3.计算span大块内存的其实地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageID << PAGE_SHIFT); // 起始地址
size_t bytes = span->_n << PAGE_SHIFT; // 内存大小
char* end = start + bytes; // 结束地址
// 4.把大块内存切成自由链表链接起来
// a.先切一小块做头,方便尾插(定义一个尾结点指针)
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1; // 用于调试
while (start < end)
{
++i;
Next(tail) = start;
tail = Next(tail); // tail = start
start += size;
}
Next(tail) = nullptr; // 将尾结点指向空
// b.切好Span以后,将切好的Span挂到桶里面(头插)
list.PushFront(span);
return span;
//return nullptr;
}
1.在list中寻找非空的Span
2.申请内存和计算起始地址
3.把大块内存切成自由链表链接起来
注意:当我们把span切分好之后,需要将这个切分好的span挂接到central cache对应的哈希桶中。因此SpanList类需要提供一个将span插入到双链表的接口。此处我们选择头插,这样当central cache下一次从该双链表获取非空span时,一来就能找到。
由于SpanList类已经实现了Insert和Begin函数,这里实现双链表的头插直接在双链表的Begin位置进行Insert(在头结点插入数据)即可。
// 管理Span的链表结构
class SpanList
{
public:
// 头插Span
void PushFront(Span* span)
{
Insert(Begin(), span); // Begin之前插入span
}
private:
Span* _head; // 双向链表头结点指针
public:
std::mutex _mtx; // 桶锁,方便类外访问
};
1.3.2、获取一个K页的span
当我们调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时,如果遍历哈希桶中的双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请若干页的span了,下面我们来分析如何从page cache获取一个k页的span。
1、因为page cache是直接按照页数进行映射的,因此我们要从page cache获取一个k页的span,就应该 直接先去找page cache的第k号桶, 如果第k号桶中有span,那我们直接头删一个span返回给central cache就行了。所以我们这里需要再给 SpanList类添加对应的Empty和PopFront函数。
// 管理Span的链表结构
class SpanList
{
public:
// 判断是否为空
bool Empty()
{
return _head == _head->_next;
}
// 头删Span
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
private:
Span* _head; // 双向链表头结点指针
public:
std::mutex _mtx; // 桶锁,方便类外访问
};
2、如果page cache的第k号桶中没有span,我们就应该继续找后面的桶,只要后面任意一个桶中有一个n页的span,我们就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页span返回给central cache,再将n-k页的span挂接到page cache的第n-k号桶中。
3、如果后面的桶中也都没有span,此时我们需要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。
注意:向堆申请内存后得到的是这块内存的起始地址,此时我们需要将该地址转换为页号。由于我们向堆申请内存时都是按照页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES); // 保证在桶的范围内
// 1.检查第k个桶里面有没有Span,有则返回
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();
}
// 2.检查一下后面的桶里面有没有span,有则将其切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
// 在nSpan的头部切一个k页下来
kSpan->_pageID = nSpan->_pageID;
kSpan->_n = k;
// 更新nSpan位置
nSpan->_pageID += k;
nSpan->_n -= k;
// 将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}
// 3.没有大页的span,找堆申请128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1); // 申请128页内存
bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1; // 页数
_spanLists[bigSpan->_n].PushFront(bigSpan);
// 递归调用自己(避免代码重复)
return NewSpan(k);
}
1.第k个桶中有Span
2.第k个桶后面有Span
3.没有大页的span,找堆申请128页的span
说明:当我们向堆申请到128页的span后,需要将其切分成k页的span和128-k页的span,但是为了尽量避免出现重复的代码,此处最好不要再编写对应的切分代码。我们可以先将申请到的128页的span挂接到page cache对应的哈希桶中,然后再递归调用该函数即可,此时在往后找span时就一定会在第128号桶找到该span,然后进行切分。
有一个问题:当central cache向page cache申请内存时,central cache对应的哈希桶是处于加锁的状态的,那在访问page cache之前我们应不应该把central cache对应的桶锁解掉呢?
这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cache除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞。
因此在调用NewSpan函数之前,我们需要先将central cache对应的桶锁解掉,然后再将page cache的大锁加上,当申请到k页的span后,我们需要将page cache的大锁解掉,但此时我们不需要立刻获取到central cache中对应的桶锁。因为central cache拿到k页的span后还会对其进行切分操作,因此我们可以在span切好后需要将其挂到central cache对应的桶上时,再获取对应的桶锁。
1.3.3、加锁版获取非空span
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 1.先在list中寻找非空的Span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
// 2.list中没有非空的Span,只能向page cache申请
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
PageCache::GetInstance()->_pageMtx.unlock();
// 3.计算span大块内存的其实地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageID << PAGE_SHIFT); // 起始地址
size_t bytes = span->_n << PAGE_SHIFT; // 内存大小
char* end = start + bytes; // 结束地址
// 4.把大块内存切成自由链表链接起来
// a.先切一小块做头,方便尾插(定义一个尾结点指针)
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1; // 用于调试
while (start < end)
{
++i;
Next(tail) = start;
tail = Next(tail); // tail = start
start += size;
}
Next(tail) = nullptr; // 将尾结点指向空
// b.切好Span以后,将切好的Span挂到桶里面(头插),再加锁
list._mtx.lock();
list.PushFront(span);
return span;
//return nullptr;
}