【高并发内存池】细节处理 + 性能优化 + 总结

发布于:2025-03-14 ⋅ 阅读:(18) ⋅ 点赞:(0)

在这里插入图片描述

1. 细节处理

1.1 细节1

我们这个项目本来就是要代替系统的内存分配相关函数malloc和free,但是在我们的代码中还是在通过new申请,delete释放。new底层封装了malloc,delete底层封装了free。还是没和malloc和free脱离。

这里我们就可以把项目最开始做的定长内存池拿过来用了。它是一个模板想要什么类型内存找它要就行了。下面就把用到new和delete的地方都换成去找定长内存池申请和释放。

//ObjectPool.h

template<class T>
class ObjectPool
{
public:
	ObjectPool()
		:_memory(nullptr), _remainBytes(0),_freelist(nullptr)
	{}

	T* New()
	{
		T* obj = nullptr;
		//如果链表中有空闲的内存块,就优先使用
		if (_freelist)
		{
			//头删
			void* next = *(void**)_freelist;
			obj = (T*)_freelist;
			_freelist = next;
		}
		else
		{
			//if (_memory == nullptr)
			if(_remainBytes < sizeof(T))//如果当前内存块不够一个T类型,就重新申请一大快内存快
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remain);
				_memory = (char*)SystemAlloc(_remainBytes >> 13);//按页申请,想要每页8kb,申请多少页
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			obj = (T*)_memory;
			//保证内存块至少能放下一个地址
			size_t Size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += Size;
			_remainBytes -= Size;
		}

		// 定位new,显示调用T的构造函数初始化
		new(obj)T;

		return obj;
	}

	void Delete(T* obj)
	{
		//对释放的内存块进行头插

		// 显示调用析构函数清理对象
		obj->~T();

		//直接头插
		* (void**)obj = _freelist;
		_freelist = obj;
	}

private:
	char* _memory;//指向申请的一大块内存
	size_t _remainBytes;//记录申请一大块内存还剩下多少内存
	void* _freelist;//指向释放后的内存,以链表形式管理起来
};
//PageCache.h

#include"Common.h"
#include"ObjectPool.h"

//所有线程共享一个PageCache
class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	//从PageCache第k号桶中获取一个span对象
	Span* NewSpan(size_t k);

	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);

	// 释放空闲span回到Pagecache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

private:
	PageCache()
	{}

	PageCache(const PageCache&) = delete;
	PageCache& operator=(const PageCache&) = delete;

	static PageCache _sInst;
private:
	Spanlist _spanlists[NPAGES];
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;//记录页号和span一一对应关系
	ObjectPool<Span> _spanPool; //申请Span的地方都用定长内存池申请,不用new
public:
	std::mutex _pageMtx; //整个锁
//PageCache.cpp

//从PageCache第k号桶中获取一个span对象
Span* PageCache::NewSpan(size_t k)
{
	//assert(k > 0 && k < NPAGES);

	assert(k > 0);

	//大于 128page找堆要
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		//申请一个span对象管理这块内存,释放内存的时候要对应的span
		//Span* span = new Span;
		Span* span = _spanPool.New();
		span->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;
		_idSpanMap[span->_pageid] = span;
		return span;
	}

	//这里加锁递归会有死锁问题,除非用递归互斥锁,还有在外面调用这个函数之前加锁

	// 先检查第k个桶里面有没有span
	if (!_spanlists[k].Empty())
	{
		Span* Kspan = _spanlists[k].PopFront();


		//将Kspan给CentralCache之前,先将页号和span对应关系记录
		for (size_t i = 0; i < Kspan->_n; ++i)
		{
			_idSpanMap[Kspan->_pageid + i] = Kspan;
		}

		//将管理k页的span给Central Cache
		return Kspan;
	}

	//走到这里第k个桶没有span,那就继续往下面的桶找
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanlists[i].Empty())
		{
			//将一个管理更多页的span,变成两个span,一个管理k页,一个管理i-k页
			Span* Ispan = _spanlists[i].PopFront();
			//Span* Kspan = new Span;
			Span* Kspan = _spanPool.New();

			Kspan->_pageid = Ispan->_pageid;
			Kspan->_n = k;

			Ispan->_pageid += k;
			Ispan->_n -= k;

			//将Ispan挂在对应大小的桶
			_spanlists[Ispan->_n].PushFront(Ispan);
			//记录挂在PageCache中还未被使用的Ispan的页号和span对应关系
			//这里仅需记录这个span的首页和尾页与Ispan的对应关系即可,
			//不像返回给CentralCache的Kspan的需要把这个Kspan管理的每一页都和Kspan建立映射关系
			//因为合并仅需知道每个Ispan的首页和尾页就可以找到Ispan,而返回给CentralCache的Kspan,
			//首先需要将Kspan切成一块块小内存才行才能再给ThreadCache用,
			//当小内存回来/8kb可能是Kspan管理的其中某一页,才能知道该页对应span
			_idSpanMap[Ispan->_pageid] = Ispan;
			_idSpanMap[Ispan->_pageid + Ispan->_n - 1] = Ispan;


			//将Kspan给CentralCache之前,先将页号和span对应关系记录
			for (size_t i = 0; i < Kspan->_n; ++i)
			{
				_idSpanMap[Kspan->_pageid + i] = Kspan;
			}

			//将管理k页的span给Central Cache
			return Kspan;
		}
	}

	//走到这里说明,后面的位置都没有大页的span
	//那就去找堆申请一个128page的span
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;
	//span挂在对应桶下
	_spanlists[bigSpan->_n].PushFront(bigSpan);

	//重复上述过程寻找k号桶的span,一定还回一个span
	return NewSpan(k);
}



// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//大于 128page直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageid << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);
		return;
	}


	// 如何知道相邻的span是否能合并?
	// 通过自己的页号找到相邻的span看是否能合并,如果该页的span在PageCache说明该span还没有被使用,可以合并
	// 如果在CentralCache说明该span正在被使用,不能合并

	// 如何知道一个span是否被使用? 是用span中的usecount是否等于0吗? 不能!!
	// 这里有一个空档时间,当thread1线程通过TLS找到自己threadcache申请内存块,但是没有,
	// 就去找CentralCache,但是CentralCache对应桶下也没有,那就只能去找PageCache了
	// PageCache返回给CentralCache一个span,这个span的usecount初始可是0,
	// CentralCache拿到后对span这一大块内存切成一块块小内存
	// 在挂到对应桶下,但是这时候thread2,要合并这个span,那就有问题了,thread1正准备从这span拿一批量
	// 但是还没有拿到,这个span的usecount可还是0,只有拿走了usecount才会++
	// thread2把这个span和自己span合并了,那就造成线程安全的问题!!

	// 因此需要给span对象加一个isuse成员记录这个span是否被使用


	// 如何通过页号找到相邻的页? 还是得用unordered_map记录页号合span对应关系
	// 但是目前的unordered_map只记录了给CentralCache已经被使用的span的页号和span对应关系
	// 并没有记录在PageCache的span的页号和span对应关系
	// 因此需要把在PageCache的span的页号和span对应关系也要记录在unordered_map中


	//先走前面相邻的span是否能合并,能合并就一直合
	while (1)
	{
		PAGE_ID prevId = span->_pageid - 1;
		auto ret = _idSpanMap.find(prevId);
		// 前面的页号没有,不合并了(堆申请内存已经到了起始地址)
		if (ret == _idSpanMap.end())
		{
			break;
		}

		Span* prevSpan = ret->second;
		
		// 前面相邻页的span在使用,不合并了
		if (prevSpan->_isuse == true)
		{
			break;
		}

		// 合并出超过128页的span没办法挂在桶下,不合并了
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		// 用span合并prevSpan
		span->_pageid = prevSpan->_pageid;
		span->_n += prevSpan->_n;

		// 将pevSpan从对应的桶中删除
		_spanlists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);

		// 这里可能有疑问,那遗留unordered_map中被合并的对应页和prevSpan之间一对一的关系难道不删除吗?
		// 因为prevSpan已经被删除了,在去通过已有页去找span那就是野指针了! 但其实并不用删除.
		// 首先被合并的页已经被span管理起来了,合并结束之后会被挂在对应桶下,并且记录该span首页和尾页与span的对应关系.
		// 当CentralCache要的时候,在把span切分成两个span,返回给CentralCache的Kspan每页都和Kspan重新进行映射
		// 留在PageCache的Ispan的首页和尾页也会和Ispan重新映射
		// 这样的话,以前被合并,遗留下来的页又和新得span建立了映射关系,就不会有通过页找span会有野指针的问题

	}

	//找后面相邻的span合并
	while (1)
	{
		PAGE_ID nextId = span->_pageid + span->_n;
		auto ret = _idSpanMap.find(nextId);
		// 后面的页号没有,不合并了(堆申请内存已经到了结尾地址)
		if (ret == _idSpanMap.end())
		{
			break;
		}

		Span* nextSpan = ret->second;
		// 后面相邻页的span在使用,不合并了
		if (nextSpan->_isuse == true)
		{
			break;
		}

		// 合并出超过128页的span没办法挂在桶下,不合并了
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}


		span->_n += nextSpan->_n;

		_spanlists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}

	//合并好的span挂在对应桶下
	_spanlists[span->_n].PushFront(span);
	span->_isuse = false;
	//重新映射在PageCace的Span的首页和尾页与Span映射关系
	_idSpanMap[span->_pageid] = span;
	_idSpanMap[span->_pageid + span->_n - 1] = span;

}
//ConcurrentAlloc.h

static void* ConcurrentAlloc(size_t size)
{

	//大于256KB/8KB=13page 小于等于128page 找PageCache要
	//大于128page找堆要

	if (size > MAX_BYTES)
	{
		//找PageCache或者堆要都是以页为单位对齐
		size_t alignnum = SizeClass::RoundUp(size);
		//在PageCache那个桶
		size_t kpage = alignnum >> PAGE_SHIFT;

		//在PageCache内部处理找是否找堆要
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kpage);
		//这里也不需要记录这个span已经被使用,因为还得时候合并的这个span相邻页的span看是否在使用
		PageCache::GetInstance()->_pageMtx.unlock();

		//得到地址返回
		void* ptr = (void*)(span->_pageid << PAGE_SHIFT);
		return ptr;

	}
	else//小于256KB通过三层缓存要
	{
		//通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			//pTLSThreadCache = new ThreadCache;
			static ObjectPool<ThreadCache> tcPool;//静态对象只存在一份,所有线程共享
			pTLSThreadCache = tcPool.New();
		}

		//cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;
		return pTLSThreadCache->Allocate(size);
	}
}

1.2 细节2

调用释放内存函数的时候,我们不仅传了地址,还传了一个size表示释放内存块有多大。不想传这个大小,但是下面还得用这个大小,用来判断释放的内存是否小于256KB通过ThreadCache去还,还是大于32Page小于等于128Page还给PageCache,还是大于128Page直接还给堆,怎么办?

其实我们可以在给Span一个成员_objsize,记录当前Span内切好的内存块对象有多大,如果一个Span中做了切分所有小内存块大小都是固定大小。还有直接找PageCache和找堆要的,虽然Span内不做切分,但是我们也给记着。这样我们就知道通过释放内存块的地址,转变成页号,在找到对应的Span,拿到_objsize就知道这个内存块大小了。

//Common.hpp

//Span管理一个跨度的大块内存

//管理以页为单位的大快内存
struct Span
{
	PAGE_ID _pageid = 0;//管理一大块连续内存块的起始页的页号
	size_t _n = 0;//管理几页

	Span* _prev = nullptr;//带头双向循环链表前指针
	Span* _next = nullptr;//带头双向循环链表后指针

	size_t _objsize = 0;//记录切分成一个个小内存块的大小

	size_t _usecount = 0;//记录Span对象中自由链表挂的内存块对象使用数量
	void* _freelist = nullptr;//自由链表挂的是Span对象一块大连续内存块按照桶位置大小切分的一块块小的内存块

	bool _isuse = false;//当前Span是否被使用
};

当CentralCache从PageCache拿到一个Span,然后要做切分,这个时候我们需要处理一下对应Span的_objsize。

//CentralCache.hpp

// 获取一个非空的span
Span* CentralCache::GetOneSpan(Spanlist& list, size_t size)
{
	//遍历对应桶中是否有span或者span中是否有内存对象
	Span* it = list.Begin();
	while (it != list.End())
	{
		if (it->_freelist)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	//当Central Cache对应桶下面没有span,那就往下一层Page Cache找,但是首先要先把对应桶锁释放
	//如果再来一个线程是申请内存但是没有内存锁住也没问题,但是如果这个线程是来还内存的呢?
	//锁住那不就还不了了,因此往下走之前先把桶锁释放
	list._mtx.unlock();

	//走到这里说明该桶中并没有span或者span中没有内存对象,Central Cache那就找到下一层Page Cache要一个span
	//Page Cache 是一个按桶下标映射的哈希桶,第i号桶表示这个桶下面的span管理的都是i页page
	//central找page要,关注的是要的span是管理k页的span,然后就去k号桶去要
	//PageCache的锁是一把整锁,虽然也可以是桶锁但是消耗性能
	//当PageCache对应桶没有span就往下面桶继续找,就涉及多个桶加锁解锁,
	//因此我们在最外面直接把PageCache锁住,只加锁解锁一次即可

	PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	//在CentralCache中的span是已经被使用的span
	span->_isuse = true;//span被使用
	span->_objsize = size;//切分成一个个小内存块的大小
	PageCache::GetInstance()->_pageMtx.unlock();



	//从PageCache拿上来的span是每一个线程独享的,因此切片时不需要加锁

	//把从PageCache拿回的span切分成size大小的一个个内存块挂在自由链表,最后再把span挂在对应的桶中

	//根据页号找到span管理的一大块内存的起始地址
	//地址用char* 方便下面地址++
	char* start = (char*)(span->_pageid << PAGE_SHIFT);
	//计算这块内存有多大
	size_t bytes = span->_n << PAGE_SHIFT;
	//结尾地址
	char* end = start + bytes;

	//使用尾插法将大内存变成大小为size的一快快小内存挂在自由链表上
    span->_freelist = start;
	start += size;
	void* tail = span->_freelist;
	while (start < end)
	{
		NextObj(tail) = start;
		tail = start;
		start += size;
	}
	//自由链表最后为nullptr
	NextObj(tail) = nullptr;

	//切好span以后,需要把span挂到桶里面去的时候,再加锁
	list._mtx.lock();

	//再把span挂在对应桶
	list.PushFront(span);

	return span;
}

直接从PageCache和堆要的我们拿到之后也要记录对应Span的_objsize

static void* ConcurrentAlloc(size_t size)
{

	//大于256KB/8KB=13page 小于等于128page 找PageCache要
	//大于128page找堆要

	if (size > MAX_BYTES)
	{
		//找PageCache或者堆要都是以页为单位对齐
		size_t alignnum = SizeClass::RoundUp(size);
		//在PageCache那个桶
		size_t kpage = alignnum >> PAGE_SHIFT;

		//在PageCache内部处理找是否找堆要
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kpage);
		//这里也不需要记录这个span已经被使用,因为还得时候合并的这个span相邻页的span看是否在使用
		//虽然这里Span不切分成一个个小内存,但是为了方便释放内存的时候不传大小,也需要把内存大小记录
		span->_objsize = alignnum;
		PageCache::GetInstance()->_pageMtx.unlock();

		//得到地址返回
		void* ptr = (void*)(span->_pageid << PAGE_SHIFT);
		return ptr;

	}
	else//小于256KB通过三层缓存要
	{
		//通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			//pTLSThreadCache = new ThreadCache;
			static ObjectPool<ThreadCache> tcPool;//静态对象,每个线程进来都用的是自己内部第一次创建的那一个
			pTLSThreadCache = tcPool.New();
		}

		//cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;
		return pTLSThreadCache->Allocate(size);
	}
}

这样还得时候,直接通过Span就知道这个内存块的大小了。

//static void ConcurrentFree(void* ptr,size_t size)
static void ConcurrentFree(void* ptr)
{

	//释放内存的时候我们除了传地址还需要传内存块大小是多少,
	//但是现在我们不想传这个大小了,但是下面我们还必须要用这大小,如何解决?
	//申请的内存都是从Span拿的,我们可以给Span增加一个记录切分小内存块大小的成员
	//这样通过地址,拿到Span,就知道还的内存块大小是多少

	//根据地址找转换为对应的页号,通过页号找到对应的span,在找到对应内存大小
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objsize;

	//大于32page小于等于128page直接还给PageCache,
	//大于128page直接还给堆
	if (size > MAX_BYTES)
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

1.3 细节3

STL容器是线程不安全的,多线程在并发用unordered_map的时候会有线程不安全的问题,在PageCache中对unordered_map写之前我们加锁了,但是在读unordered_map我们并没有加锁。比如从CentralCache还内存块给对应的Span,我们先通过内存块的地址找到页号,在通过页号找到对应Span。还有在释放内存的时候我们页需要知道这个内存块的大小,也是通过内存的地址找到页号在找到对应Span。都调用了下面的函数。

// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

因此在读之前也要加锁,如果在调用这个函数之前加锁,在调用解锁之后解锁,那就需要在多个地方写加锁解锁。因此我们直接在这个函数里面加锁,这里我们用一个RAII风格的锁,构造时加锁,析构时解锁。

// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
	//STL容器是线程不安全的,多个地方会访问unordered_map
	std::unique_lock<std::mutex> lock(_pageMtx);
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

2.复杂情况下的调试技巧

代码多,可能有不小心写错的地方,这个时候在按照以往F11一步一步去走就太慢了。这里我们有几种方法。

  1. 测试验证 + 条件断点

比如下面一段代码,向ThreadCache一次挂一批内存块。如果这里有不对的地方。
就写个测试验证然后在写一个if条件判断,if里面随便写一句就行,然后在这里打个断点。如果一次没有挂这里多,就会在if里面停下来。

//一次挂一批
void PushRange(void* start,void* end,size_t n)
{
	NextObj(end) = _freelist;
	_freelist = start;
	_size += n;

	//void* cur = start;
	//size_t j = 0;
	//while (cur)
	//{
	//	cur = NextObj(cur);
	//	++j;
	//}

	//if (j != n)
	//{
	//	int x = 0;
	//}
}
  1. 在调试哪里把调用栈帧打开(看是谁调的这个函数)
  2. 然后把监视打开
  3. 如果程序疑似死循环,可以点击中断程序,程序会正在运行的地方停下来

3. 性能分析

你怎么知道当前项目性能怎么样,是哪里造成性能不好的原因?VS点击下面就行性能分析,你会发现当前项目甚至不如malloc和free快。慢的原因就是由于在读写unordered_map的时候一直在加锁解锁导致性能低下。

在这里插入图片描述

在这里插入图片描述

可以用下面这段代码测一测。

//Benchmark.hpp

#include"ConcurrentAlloc.h"

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(malloc(16));
					v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(ConcurrentAlloc(16));
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" << endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

即使你把unordered_map换成map也不行。它们读写都必须加锁。比如一个线程在读的时候,另一个线程去写了。读还是老位置,如果是对map写可能会旋转,对unordered_map写可能扩容,都会导致原有位置改变。但是读还是读的老位置,就有了问题,因此必须加锁。

这里我们参考Google开源tcmalloc换个数据结构,使用 基数树(Radix Tree),它是一种整数到整数(地址) 的映射多叉搜索树。每层节点其实都是一个指针数组。每层指针数组大小按照有几个比特位的取值范围划分看一层数组有多大。最后一层叶子节点存对应的指针。

在这里插入图片描述
由于空间是提前开好的,读写都不会去动这个结构。写是在PageCache是加过锁的,读的时候就是在释放的时候去读,在读这个Span的时候,这个Span早在拿它的时候就写过了。读写分离了,所有就减少了读写并发的问题,也就少了很多加锁解锁。性能自然就提高了。

tcmalloc写了三种基数树,一层基数树,二层基数树,三层基数树,其中一、二层基数树适用于32位下,三层基数树适用于64位下。

在这里插入图片描述

//PageMap.h

#pragma once
#include"Common.h"
#include"ObjectPool.h"

// 性能优化
// 不管是用unordered_map还是用map保存页号与Span指针的映射关系,效率都太低了
// 原因在于,对unordered_map还是map进行读写的时候都要进行加锁解锁导致效率低
// 但是读写是必须要进行加锁的,当在对map进行写的时候可能会进行旋转,
// 对unordered_map进行写的时候可能会有扩容
// 考虑这样一个问题,当一个线程在读的时候,它读的时候位置结构可没变(位置没变)
// 但是另一个线程去写了导致这个结构旋转/扩容,导致位置改变,进而导致原先读的位置改变了,所以造成了问题
// 因此不管对这个结构读还是写都必须加锁,所以效率会低

// 如何解决?

// 这里可以使用基数树,基数树其实也是一个整数到整数(指针)的映射关系的多叉搜索树结构
// 每层其实都是一个指针数组,每层数组下标代表对应分层比特位取值的范围大小,只有最后一层叶子节点才会存对应的指针

// 假设key值等于0x840FF, 其二进制按照6bit一簇可以写成,000010 -000100 -000011 -111111,
// 从左到右的index值分别为2, 4,3, 63。那么根据key值0x840FF找到value的过程就只需要4步:
// 第一步,在最上层的节点A中找到index为2的slot,其slot[2]指针指向第二层节点中的节点B。
// 第二步,在节点B中找到index为4的slot,其slot[4]指针指向第三层节点中的节点C。
// 第三步,在节点C中找到index为3的slot,其slot[3]指针指向第三层节点中的节点D。
// 第四步,在节点D中找到index为63的slot,其slot[63]指针指向叶子节点item E。

// 由于这个结构是计算开辟好的,并且增删查改都会不改变这个结构
// 写的时候,PageCache是整体加锁的没什么问题
// 有两个地方会读,都是释放内存块的时候会去读,将地址转成页号在转成对应的Span*
// 要注意的时候,读的时候是释放,而写的时候是Span切分的时候,在读之前就已经写过了
// 因此读写是分离的!! 所以读根本不用加锁,因此使用基数树这个结构保存页号与Span*映射关系更好!


// 一页8kb
// 2^32/2^8 = 2^19
// 2^64/2^8 = 2^51

// Single-level array,一层基数树
// 一层基数树结构适用于32位,
// BITS 表示存储页号需要多少位
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;// 2^19位,数组里面存的是Span指针,所以指针数组大小 2^19 * 4 = 2 ^ 21 = 2M
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS;//数组大小
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);//按页(1页8KB)对齐
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);//SystemAlloc按照页申请
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const { //取
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) { //放
		array_[k] = v;
	}
};

// Two-level radix tree,两层基数树
// 也适合32位
template <int BITS> //2^31/2^8=2^19,BITS=19
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	//第一层
	static const int ROOT_BITS = 5;//19位的前5个比特位
	static const int ROOT_LENGTH = 1 << ROOT_BITS;//第一层数组指针大小 2^5=32

	//第二层
	static const int LEAF_BITS = BITS - ROOT_BITS;//19位剩下的14个比特位
	static const int LEAF_LENGTH = 1 << LEAF_BITS;//第二层数组指针大小 2^14=16384

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

// Three-level radix tree,3层基数树
// 适合64位
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

然后我们把使用unordered_map的地方换成基数树。

//PageCache.h

#include"Common.h"
#include"ObjectPool.h"
#include"PageMap.h"


//所有线程共享一个PageCache
class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	//从PageCache第k号桶中获取一个span对象
	Span* NewSpan(size_t k);


	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);

	// 释放空闲span回到Pagecache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

private:
	PageCache()
	{}

	PageCache(const PageCache&) = delete;
	PageCache& operator=(const PageCache&) = delete;

	static PageCache _sInst;
private:
	Spanlist _spanlists[NPAGES];
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;//记录页号和span一一对应关系
	// 
	//性能优化,使用一层的基数树
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
	ObjectPool<Span> _spanPool; //申请Span的地方都用定长内存池申请,不用new
public:
	std::mutex _pageMtx; //整个锁
};
//PageCache.cpp

#include"PageCache.h"

PageCache PageCache::_sInst;


//从PageCache第k号桶中获取一个span对象
Span* PageCache::NewSpan(size_t k)
{
	//assert(k > 0 && k < NPAGES);

	assert(k > 0);

	//大于 128page找堆要
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		//申请一个span对象管理这块内存,释放内存的时候要对应的span
		//Span* span = new Span;
		Span* span = _spanPool.New();
		span->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;
		//_idSpanMap[span->_pageid] = span;
		_idSpanMap.set(span->_pageid, span);
		return span;
	}

	//这里加锁递归会有死锁问题,除非用递归互斥锁,还有在外面调用这个函数之前加锁

	// 先检查第k个桶里面有没有span
	if (!_spanlists[k].Empty())
	{
		Span* Kspan = _spanlists[k].PopFront();


		//将Kspan给CentralCache之前,先将页号和span对应关系记录
		for (size_t i = 0; i < Kspan->_n; ++i)
		{
			//_idSpanMap[Kspan->_pageid + i] = Kspan;
			_idSpanMap.set(Kspan->_pageid + i, Kspan);
		}

		//将管理k页的span给Central Cache
		return Kspan;
	}

	//走到这里第k个桶没有span,那就继续往下面的桶找
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanlists[i].Empty())
		{
			//将一个管理更多页的span,变成两个span,一个管理k页,一个管理i-k页
			Span* Ispan = _spanlists[i].PopFront();
			//Span* Kspan = new Span;
			Span* Kspan = _spanPool.New();

			Kspan->_pageid = Ispan->_pageid;
			Kspan->_n = k;

			Ispan->_pageid += k;
			Ispan->_n -= k;

			//将Ispan挂在对应大小的桶
			_spanlists[Ispan->_n].PushFront(Ispan);
			//记录挂在PageCache中还未被使用的Ispan的页号和span对应关系
			//这里仅需记录这个span的首页和尾页与Ispan的对应关系即可,
			//不像返回给CentralCache的Kspan的需要把这个Kspan管理的每一页都和Kspan建立映射关系
			//因为合并仅需知道每个Ispan的首页和尾页就可以找到Ispan,而返回给CentralCache的Kspan,
			//首先需要将Kspan切成一块块小内存才行才能再给ThreadCache用,
			//当小内存回来/8kb可能是Kspan管理的其中某一页,才能知道该页对应span
			//_idSpanMap[Ispan->_pageid] = Ispan;
			//_idSpanMap[Ispan->_pageid + Ispan->_n - 1] = Ispan;
			_idSpanMap.set(Ispan->_pageid, Ispan);
			_idSpanMap.set(Ispan->_pageid + Ispan->_n - 1, Ispan);


			//将Kspan给CentralCache之前,先将页号和span对应关系记录
			for (size_t i = 0; i < Kspan->_n; ++i)
			{
				//_idSpanMap[Kspan->_pageid + i] = Kspan;
				_idSpanMap.set(Kspan->_pageid + i, Kspan);
			}

			//将管理k页的span给Central Cache
			return Kspan;
		}
	}

	//走到这里说明,后面的位置都没有大页的span
	//那就去找堆申请一个128page的span
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;
	//span挂在对应桶下
	_spanlists[bigSpan->_n].PushFront(bigSpan);

	//重复上述过程寻找k号桶的span,一定还回一个span
	return NewSpan(k);
}


// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
	//STL容器是线程不安全的,多个地方会访问unordered_map
	//std::unique_lock<std::mutex> lock(_pageMtx);
	//auto ret = _idSpanMap.find(id);
	//if (ret != _idSpanMap.end())
	//{
	//	return ret->second;
	//}
	//else
	//{
	//	assert(false);
	//	return nullptr;
	//}
	Span* span = (Span*)_idSpanMap.get(id);
	assert(span);
	return span;
}


// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//大于 128page直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageid << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);
		return;
	}


	// 如何知道相邻的span是否能合并?
	// 通过自己的页号找到相邻的span看是否能合并,如果该页的span在PageCache说明该span还没有被使用,可以合并
	// 如果在CentralCache说明该span正在被使用,不能合并

	// 如何知道一个span是否被使用? 是用span中的usecount是否等于0吗? 不能!!
	// 这里有一个空档时间,当thread1线程通过TLS找到自己threadcache申请内存块,但是没有,
	// 就去找CentralCache,但是CentralCache对应桶下也没有,那就只能去找PageCache了
	// PageCache返回给CentralCache一个span,这个span的usecount初始可是0,
	// CentralCache拿到后对span这一大块内存切成一块块小内存
	// 在挂到对应桶下,但是这时候thread2,要合并这个span,那就有问题了,thread1正准备从这span拿一批量
	// 但是还没有拿到,这个span的usecount可还是0,只有拿走了usecount才会++
	// thread2把这个span和自己span合并了,那就造成线程安全的问题!!

	// 因此需要给span对象加一个isuse成员记录这个span是否被使用


	// 如何通过页号找到相邻的页? 还是得用unordered_map记录页号合span对应关系
	// 但是目前的unordered_map只记录了给CentralCache已经被使用的span的页号和span对应关系
	// 并没有记录在PageCache的span的页号和span对应关系
	// 因此需要把在PageCache的span的页号和span对应关系也要记录在unordered_map中


	//先走前面相邻的span是否能合并,能合并就一直合
	while (1)
	{
		PAGE_ID prevId = span->_pageid - 1;
		//auto ret = _idSpanMap.find(prevId);
		 前面的页号没有,不合并了(堆申请内存已经到了起始地址)
		//if (ret == _idSpanMap.end())
		//{
		//	break;
		//}

		auto ret = (Span*)_idSpanMap.get(prevId);
		if (ret == nullptr)
		{
			break;
		}

		//Span* prevSpan = ret->second;
		
		Span* prevSpan = ret;
		// 前面相邻页的span在使用,不合并了
		if (prevSpan->_isuse == true)
		{
			break;
		}

		// 合并出超过128页的span没办法挂在桶下,不合并了
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		// 用span合并prevSpan
		span->_pageid = prevSpan->_pageid;
		span->_n += prevSpan->_n;

		// 将pevSpan从对应的桶中删除
		_spanlists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);

		// 这里可能有疑问,那遗留unordered_map中被合并的对应页和prevSpan之间一对一的关系难道不删除吗?
		// 因为prevSpan已经被删除了,在去通过已有页去找span那就是野指针了! 但其实并不用删除.
		// 首先被合并的页已经被span管理起来了,合并结束之后会被挂在对应桶下,并且记录该span首页和尾页与span的对应关系.
		// 当CentralCache要的时候,在把span切分成两个span,返回给CentralCache的Kspan每页都和Kspan重新进行映射
		// 留在PageCache的Ispan的首页和尾页也会和Ispan重新映射
		// 这样的话,以前被合并,遗留下来的页又和新得span建立了映射关系,就不会有通过页找span会有野指针的问题

	}

	//找后面相邻的span合并
	while (1)
	{
		PAGE_ID nextId = span->_pageid + span->_n;
		//auto ret = _idSpanMap.find(nextId);
		 后面的页号没有,不合并了(堆申请内存已经到了结尾地址)
		//if (ret == _idSpanMap.end())
		//{
		//	break;
		//}

		auto ret = (Span*)_idSpanMap.get(nextId);
		if (ret == nullptr)
		{
			break;
		}

		//Span* nextSpan = ret->second;
		Span* nextSpan = ret;
		// 后面相邻页的span在使用,不合并了
		if (nextSpan->_isuse == true)
		{
			break;
		}

		// 合并出超过128页的span没办法挂在桶下,不合并了
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}


		span->_n += nextSpan->_n;

		_spanlists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}

	//合并好的span挂在对应桶下
	_spanlists[span->_n].PushFront(span);
	span->_isuse = false;
	//重新映射在PageCace的Span的首页和尾页与Span映射关系
	/*_idSpanMap[span->_pageid] = span;
	_idSpanMap[span->_pageid + span->_n - 1] = span;*/
	_idSpanMap.set(span->_pageid, span);
	_idSpanMap.set(span->_pageid + span->_n - 1, span);
}

至此关于高并发内存池所有东西都结束了。细节很多需要好好琢磨,这个项目知识把tcmalloc核心的东西拿出来了,还有很多没有实现,我们主要就是学习它的核心。对这个项目更多知识感兴趣的可以去读tcmalloc源码

4. 扩展学习及当前项目实现的不足

实际中我们测试了,当前实现的并发内存池比malloc/free是更加⾼效的,那么我们能否替换到系统调用malloc呢?实际上是可以的。

  • 不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持 alias attribute,所以替换就变成了这种通用形式:

void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))
因此所有malloc的调用都跳转到了tc_malloc的实现

具体参考这里:GCC attribute 之weak,alias属性

有些平台不支持这样的东西,需要使⽤hook的钩子技术来做。
关于hook请看这里:hook