C++ 项目 -- 高并发内存池

发布于:2025-05-08 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

项目介绍

内存池概念

池化技术

内存池

内存池主要解决的问题

malloc

定长内存池

申请内存

释放内存

整体框架设计

thread cache        

申请内存

释放内存

central cache

申请内存

释放内存

 page cache

申请内存

释放内存

大块内存申请实现

定长内存池配合脱离使用new

性能测试

tcmalloc源码实现基数树优化


项目介绍

        该项目实现的时一个高并发的内存池,原型是google的开源项目tcmalloc(Thread-Caching Malloc),即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关函数(malloc、free)。在此仅把tcmalloc最核心的框架简化出来模拟实现一个自己的高并发内存池。

        相关知识概念:C/C++、链表、哈希桶、操作系统内存管理、单例模式、多线程、互斥锁

内存池概念

        我们可以通过一个现实的例子更好地理解内存池:假如有一座山,山脚处才有水流经过,而山上的居民想要打水喝,每次都需要反复地上山和下山打水,效率很低下,费时又费力。但如果在山上建一个蓄水池,通过蓄水池将山脚下的水资源转移上来,是不是就省事很多?内存池也是一样的。

池化技术

        所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高效率。在计算机中有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池。对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接受到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

内存池

        内存池是指程序预先从操作系统申请一块足够大的内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正地将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间内),内存池才将之前申请的内存真正释放。

内存池主要解决的问题

        通过刚才那个例子我们就能知道内存池主要解决的是效率问题。其次作为系统的内存分配器,还需要解决一下内存碎片的问题。

内存碎片主要分为外碎片和内碎片:

外碎片:空闲的连续内存空间太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。如图:

        vector对象和list对象销毁后释放空间,现有256+128=284bytes的内存空间,但是此时如果要申请大于256bytes的空间却申请不出来,因为这两块空间是碎片化的,不连续。

内碎片在后续内容再作解释。

malloc

        C/C++ 中我们要动态申请内存都是通过malloc去申请内存,但实际上我们不是直接去堆获取内存的。malloc就是一个内存池。malloc()相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序又大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,Linux gcc用的glibc中的ptmalloc。

但是malloc什么场景下都可以使用,这就意味着没有针对性,什么场景下都不会有太高的性能。

介绍完相关概念,我们可以开始着手实现项目内容了。在实现之前,我们先通过一个简单的定长内存池程序来熟悉内存申请和释放的过程。同时,这个定长内存池也会作为项目的一个基础组件。

定长内存池

我们首先建立一个头文件:Common.h,这个头文件中的内容其他文件都可能会用到。 

在windows下,是通过Virtual Alloc函数直接向堆申请页为单位的大块内存的:VirtualAlloc函数

而在Linux下是通过brk和mmap去向堆申请页为单位的大块内存的.

我们通过VirtualAlloc/brk/mmap来实现内存分配:

inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// Linux下使用brk、mmap实现
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;
}

此处使用了条件编译,根据自己的平台选择不同的实现。

下面创建头文件ObjectPool.h实现定长内存池,也就是ObjectPool类:

1.定长的概念

实现定长有两种途径:

(1)通过非类型模板参数实现:template <size_t N>

(2)通过类模板参数实现定长,即根据传入的对象类型大小决定内存大小

此处采用第二种途径。

template<class T>
class ObjectPool
{};

2.定长内存池需要哪些成员变量?

_memory

首先我们肯定需要一个指针,即定长内存池的起始地址。

_left

其次我们在对内存池进行内存申请和释放的操作时,需要一个变量来存储内存池的剩余空间。

_freeList

此外,我们要对释放的内存对象进行管理。我们将释放的对象“链接”到自由链表上,因为我们是根据传入的对象类型决定申请内存大小的,这必定意味着我们每一次申请的对象大小都是固定的该类型的长度。我们将释放的对象“挂”在自由链表上,下次申请内存的时候就可以优先使用自由链表上的对象。

但是我们这里有必要真的用链表来实现_freeList吗?答案是否定的,我们只需要让释放的对象能链接起来就可以了,换句话说,就是能根据当前对象找到下一个对象。所以我们可以用当前对象的前4/8个字节来存储下一个对象的地址。

这样我们通过读取当前对象的前4/8个字节内容就可以找到下一个对象的地址了。

此处的4/8个字节大小指的就是32位/64位下指针的大小(存储地址所需要的空间大小),那么问题来了,当我们寻找下一个对象时,怎么知道到底是读取4个字节还是8个字节的内容呢?这一点也需要指针的运算来解决——指针在解引用时,读取多少字节的内容取决于指针的类型:如果是对int*类型的指针进行解引用操作,我们读取的就是前四个字节(即int大小)的内容。而如果我们对void**类型的指针进行解引用操作,我们读取的就是void*大小的内容,void*的大小在32位下是4字节,在64位下是8字节,可以很好地解决这个问题。

(tips:这里只要使用二级指针进行操作都是可以的,如int**/char**)

申请内存

申请内存实际上就是返回申请的内存的起始地址,所以返回值即是T*类型的指针。

执行的操作:

①优先查看_freeList中有没有可以直接使用的对象:

        有的话直接将其中第一个对象“取”出来(头删)

②当_freeList中没有对象时就需要向内存池申请新的内存空间:

        检查内存池剩余空间大小是否够要申请的对象大小,不够的话需要重新向操作系统申请大块内存空间;

        这里要注意由于我们在_freeList中是使用每个对象的前4/8个字节来存储下一个对象的位置,必须要保证对象大小大于4/8字节。

③使用定位new对申请到的内存空间进行初始化

T* New()
{
	T* obj = nullptr;
	// 优先使用释放的内存,循环利用
	if (_freeList)
	{
		void* next = *((void**)_freeList);
		obj = (T*)_freeList;
		_freeList = next;
	}
	else
	{
		if (_left < sizeof(T)) // 大块内存的剩余字节数不够对象要申请的字节数
		{
			//重新申请大块内存
			_left = 128 * 1024; // 128KB
			_memory = (char*)SystemAlloc(_left >> PAGE_SHIFT);
            // PAGE_SHIFT作为全局变量定义在Common.h中,一页为8KB的情况下PAGE_SHIFT=13
			if (_memory == nullptr)
			{
				throw std::bad_alloc(); // 申请失败抛出异常
			}
		}

		obj = (T*)_memory;
		size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
		_memory += objSize;
		_left -= objSize;
	}

	//自定义类型
	//定位new,显式调用T的构造函数初始化
	new(obj)T;
	return obj;
}

释放内存

执行的操作:

将释放的对象头插到_freeList中

void Delete(T* obj)
{
	// 显式调用析构函数
	obj->~T();

	// 将释放的对象头插到_freeList中
	*(void**)obj = _freeList;
	_freeList = obj;
}

定长内存池的完整代码:

#include "Common.h"

template<class T>
class ObjectPool
{
public:
	T* New()
	{
		T* obj = nullptr;
		// 优先使用释放的内存,循环利用
		if (_freeList)
		{
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else
		{
			if (_left < sizeof(T)) // 大块内存的剩余字节数不够对象要申请的字节数
			{
				//重新申请大块内存
				_left = 128 * 1024;
				_memory = (char*)SystemAlloc(_left >> PAGE_SHIFT); // 128KB
				if (_memory == nullptr)
				{
					throw std::bad_alloc(); // 申请失败抛出异常
				}
			}

			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize;
			_left -= objSize;
		}
		
		//自定义类型
		//定位new,显式调用T的构造函数初始化
		new(obj)T;
		return obj;
	}

	void Delete(T* obj)
	{
		// 显式调用析构函数
		obj->~T();

		// 将释放的对象头插到_freeList中
		*(void**)obj = _freeList;
		_freeList = obj;
	}
private:
	char* _memory = nullptr; // 指向申请的大块内存的指针
	void* _freeList = nullptr; // 指向还回来的内存
	size_t _left = 0; // 大块内存的剩余字节数
};

整体框架设计

        现代很多的开发环境都是多核多线程,在申请内存的情况下,必然存在激烈的锁竞争问题。malloc本身已经非常优秀了,我们项目的原型tcmalloc是在多线程高并发的环境下更胜一筹。所以这次我们实现的内存池需要考虑以下几方面的问题。

  1. 性能问题
  2. 多线程环境下,锁竞争问题
  3. 内存碎片问题

Concurrent Memory Pool主要由以下三个部分组成:

  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
  2. central cache:中心缓存是所有线程所共享的,thread cache是按需从central cache中获取的对象。central cache在合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取对象时需要加锁的,这里使用的是桶锁并且只有thread cache没有内存对象时才会找central cache,所以这里竞争不会很激烈
  3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

以上部分下面会详细分层讲解。 

thread cache        

        thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

        在定长内存池中我们已经了解到自由链表中挂载的是按照对象大小切分好的小块内存。所以在向thread cache申请内存空间时,也应该根据申请的内存大小找到映射的哈希桶,在对应的哈希桶中申请内存。

申请内存:

1.当内存申请size <= 256KB时,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i.

2.如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。

3.如果_freeLists[i]中没有对象,则批量从central cache中获取一定数量的对象,插入到自由链表,并且返回一个对象。

        但是我们不可能为所有的内存大小都设置一个哈希桶(自由链表),这样的话我们就需要256*1024= 262144个自由链表,数量太多了。为了减少哈希桶的数量,我们会在空间利用上做出一定的牺牲,这就是所谓的内存对齐。比如:如果全部按照8bytes进行内存对齐,当申请9bytes的内存空间时,我们为其分配16bytes的内存空间,这就导致有7bytes的空间浪费。这就是内碎片——由于对齐的需求导致分配出去的空间中一些内存无法被利用。

        按照如图所示8bytes的大小进行内存对齐,我们仍然需要256KB/8=32768个哈希桶(自由链表)。于是我们向进一步减少哈希桶的数量,就需要指定一个合适的内存对齐规则,实现空间利用和哈希桶数量的平衡。这里我们整体控制在10%左右的内碎片浪费:

内存浪费率=(对齐后的内存大小 - 申请的内存大小)/对齐后的内存大小

内存大小 对齐大小 哈希桶
[1,128] 8bytes _freeLists[0,16)
[128+1,1024] 16bytes _freeLists[16,72)
[1024+1,8*1024] 128bytes _freeLists[72,128)
[8*1024+1,64*1024] 1024bytes _freeLists[128,184)
[64*1024+1,256*1024] 8*1024bytes _freeLists[184,208)

[129,1024]区间内存浪费率最大的为129bytes:15/144 ≈ 10.42%

[1025,8*1024]区间内存浪费率最大的为1025bytes:127/(1024+128) ≈ 11.02%

[8*1024+1,64*1024]区间内存浪费率最大的为8*1024+1bytes:1023/(8*1024+1024) ≈ 11.1%

[64*1024+1,256*1024]区间内存浪费率最大的为64*1024+1bytes:(8*1024-1)/(64*1024+8*1024) ≈ 11.11%

内存对齐方面的代码我们写在Common.h的SizeClass类中:

1.RoundUp,计算对齐后的内存大小

由于不同大小区间是按照不同的对齐数对齐的,我们在RoundUp中根据申请的内存大小将其所在区间的对齐数传给子函数_RoundUp,在_RoundUp中计算对齐后的内存大小。

(我们提到过Thread Cache仅仅支持小于等于256KB大小的内存申请,目前还不支持大于256KB的大小申请,所以这里先不考虑这种情况,直接assert(false))

class SizeClass
{
public:
    static inline size_t RoundUp(size_t size)
    {
        if(size <= 128)
            return _RoundUp(size, 8);
        else if(size <= 1024)
            return _RoundUp(size, 16);
        else if(size <= 8 * 1024)
            return _RoundUp(size, 128);
        else if(size <= 64 * 1024)
            return _RoundUp(size, 1024);
        else if(size <= 256 * 1024)
            return _RoundUp(size, 8 * 1024);
        else
        {
            assert(false);
            return -1;
        }
    }
}

我们初步设置的_RoundUp计算对齐后的内存如下:

size_t _RoundUp(size_t bytes, size_t alignNum)
{
    size_t alignSize = 0;
    if(bytes % alignNum == 0)
    {
        alignSize = bytes;
    }
    else
    {
        alignSize = (bytes/alignNum + 1) * alignNum;
    }
    return alignSize;
}

 但是这个函数一定会被频繁调用,采用位运算效率能够更高一些:

先让申请的内存大小bytes加上(对齐数的大小alignNum - 1),这样每个区间的最小内存加上该数后刚好等于该区间的最小对齐内存,如:

1 + (8  - 1) = 8,8是[1,128]中内存对齐后的最小数;

129 + (16 - 1) =144,144是[129, 1024]中内存对齐后的最小数

而每个区间的最大数加上对齐数减一后的大小又不超过下一个区间的最小对齐数,如:

128 + (8 - 1) = 135 < 144

1024 + (16 - 1) = 1039 < 1152

然后我们再将计算结果与上(对齐数减一的相反数)就能得到对齐后的内存大小了

static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
    return ((bytes + alignNum - 1) & ~(alignNum - 1));
}

Thread Cache在申请内存时是根据申请的内存大小到对应映射的哈希桶中去申请的,所以我们还需要写一个函数来完成申请的内存大小到第几个哈希桶之间的映射:

MAX_BYTES定义如下,作为全局变量:

static const size_t MAX_BYTES = 256 * 1024;
class SizeClass
{
public:
    static inline size_t _Index(size_t bytes, size_t align_shift)
    {
        return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; // 下标从0开始
    }

    static inline size_t Index(size_t bytes)
	{
		assert(bytes < MAX_BYTES);
		// 每个区间有多少个桶
		static int group_array[4] = { 16,56,56,56 };
		if (bytes <= 128)
		{
			return _Index(bytes, 3);
		}
		else if (bytes <= 1024)
		{
			return _Index(bytes - 128, 4) + group_array[0];
		}
		else if (bytes <= 8 * 1024)
		{
			return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
		}
		else if (bytes <= 64 * 1024)
		{
			return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
		}
		else if (bytes <= 256 * 1024)
		{
			return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
		}
		else
		{
			assert(false);
			return -1;
		}
	}
};

 还有一项准备工作,我们需要将自由链表结构定义出来,自由链表中存放的是切分好的小对象,ThreadCache申请内存时可能会去自由链表中取出一个对象,释放内存时会将释放的对象挂到对应的自由链表中,因此我们需要Pop()和Push()函数。

之前已经讲解了自由链表实际上是使用每个对象的前4/8个字节内容去存储下一个对象地址的,所以这里我们在Common.h中定义一个计算下个对象地址的函数:

static void*& NextObj(void* obj)
{
    return *((void**)obj);
};
class FreeList
{
public:
    void Push(void* obj)
    {
        assert(obj);
        NextObj(obj) = _freeList;
        _freeList = obj;
    }

    void* Pop()
    {
        assert(_freeList);
        void* obj = _freeList;
        _freeList = NextObj(obj);
        return obj;
    }

    bool Empty()
    {
        return _freeList == nullptr;
    }
private:
    void* _freeList;
};

ThreadCache.h:

NFREELIST即是哈希桶的总数量,也就是自由链表的数量,共208个(根据内存对齐规则计算):

128/8 + (1024-128)/16 + (8*1024-1024)/128 + (64*1024-8*1024)/1024 + (256*1024-64*1024)/(8*1024) = 208

static const size_t NFREELIST = 208;
#include "Common.h"

class ThreadCache
{
public:
    void* Allocate(size_t size); //申请内存对象
    void Deallocate(void* ptr, size_t size);
    
    //ThreadCache中内存不够向上一层CentralCache申请
    void* FetchFromCentralCache(size_i index, size_t size);
    
    //当一个自由链表桶的长度过长,回收内存给CentralCache
    //实现内存的均衡调度
    void ListTooLong(Free& list, size_t size);
private:
    FreeList _freeLists[NFREELIST];
};

//通过TLS每个线程无锁地获取自己专属的ThreadCache对象
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

申请内存

ThreadCache.cpp:

void* ThreadCache::Allocate(size_t size)
{
    assert(size <= MAX_BYTES);
    size_t alignSize = SizeClass::RoundUp(size);
    size_t index = SizeClass::Index(size);
    if(!_freeLists[index].Empty())
    {
        return _freeLists[index].Pop();
    }
    else
    {
        return FetchFromCentralCache(index, alignSize);
    }
}

释放内存

1.当释放内存小于256KB时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。

2.当链表的长度过长,则回收一部分内存对象到central cache(内存的均衡调度)。

此处我们想要实现判断一个自由链表桶的长度是否需要过长,需要回收内存给central cache,就需要给FreeList新增一个成员变量size来储存自由链表中的对象个数。

那到底怎样才算是链表长度过长,需要回收内存给central cache呢?

我们还需要在FreeList中定义一个成员变量_maxSize,代表链表的最大长度,即当链表长度大于等于_maxSize时就需要回收内存了。但是有关_maxSize值的相关操作我们放在central cache中进行讲解,此处先不做解释 

class FreeList
{
public:
    // ...
    void Push(void* obj)
    {
       // ...
        ++_size; // 自由链表长度加一
    };
    
    void* Pop()
    {
        // ...
        --_size; // 自由链表长度减一
        return obj; 
    }
    
    size_t& MaxSize()
    {
        return _maxSize;
    }

    size_t Size()
    {
        return _size;
    }
private:
    void* _freeList;
    size_t size = 0; //自由链表的长度
    size_t _maxSize = 1;
};
void ThreadCache::Deallocate(void* ptr, size_t size)
{
    assert(ptr);
    size_t index = SizeClass::Index(size);
    _freeLists[index].Push(ptr);
    if(_freeLists[index].Size() >= _freeLists[index].MaxSize())
    {
        ListTooLong(_freeLists[index], size);
    }
}

ListTooLong()函数进行的操作就是将自由链表_freeLists[index]中的内存对象回收给central cache。所以我们还需要写一个函数PopRange去处理对应的自由链表,将其中的所有对象pop出来回收给central cache:

ThreadCache.cpp:

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
    void* start = nullptr;
    void* end = nullptr;
    list.PopRange(start, end, list.MaxSize());
    
    // 将内存回收给central cache
    // CentralCache::GetInstance()->ReleaseListToSpan(start, size); // 后面详解
}

Common.h:

class FreeList
{
    // ...
    void PopRange(void*& start, void*& end, size_t n)
    {
        assert(n <= _size);
        start = _freeList;
        end = start;
        // end指向最后一个对象的起始地址
        for(size_t i = 0; i < n - 1; i++)
        {
            end = NextObj(end);
        }
        _freeList = NextObj(end);
        NextObj(end) = nullptr; // 因为我们需要将pop出来的这一段自由链表还给central cache
        // 所以此处要将NextObj(end)置空
        _size -= n;
    }
};

central cache

        central cache也是一个哈希桶结构,它的哈希桶的映射关系跟thread cache是一样的。不同的是它的每个哈希桶位置挂的时SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。

由于Span在合适的时机也会回收给page cache,所以涉及到将某一个span从一个哈希桶中取出来的操作,如下图:

将指向的span从central cache中取出回收给page cache

很明显这里spanlist采用双向带头结构操作更加方便。 

Common.h: 

Span管理一个跨度的大块内存,以页为单位,PAGE_ID代表页号,在32位和64位平台下不同,采用条件编译:

#ifdef _WIN64
    typedef unsigned long long PAGE_ID;
#elif _WIN32
    typedef size_t PAGE_ID;
#else
    //linux
#endif
struct Span
{
    void* _freeList = nullptr; // 挂载的自由链表
    size_t _objSize = 0; // 切分的小块内存对象大小
    PAGE_ID _pageId = 0; // 起始页号
    size_t _n = 0; // 页数
    size_t _useCount = 0; // 分配出去的对象的数量
    
    Span* _prev = nullptr;
    Span* _next = nullptr;  
};

class SpanList
{
public:
    SpanList()
    {
        _head = new Span;
        _head->_prev = nullptr;
        _head->_next = nullptr;
    }

    void Insert(Span* pos, Span* newSpan)
    {
        assert(pos);
        assert(newSpan);
        Span* prev = pos->_prev;
        prev->_next = newSpan;
        newSpan->_prev = pos;
        newSpan->_next = pos;
        pos->_prev = newSpan;
    }

    void Erase(Span* pos)
    {
        assert(pos);
        assert(pos != _head);
        Span* prev = pos->_prev;
        Span* next = pos->_next;
        prev->_next = next;
        next->_prev = pos;
        // 不删除pos
        // pos是回收给page cache的
    }

    bool Empty()
    {
        return _head->_next == _head;
    }

    Span* Begin()
    {
        return _head->_next;
    }

    Span* End()
    {
        return _head;
    }

    void PushFront(Span* span)
    {
        Insert(Begin(), span);
    }

    Span* PopFront()
    {
        Span* front = _head->_next;
        Erase(front);
        return front;
    }
private:
    Span* _head;
public:
    std::mutex _mtx; // 桶锁
};

CentralCache.h:

central cache和thread cache不同,thread cache可能有多个,但central cache一定只存在一个,所以我们采用单例模式来设计。

并且由于central cache的哈希桶映射规则与thread cache相同,所以同样拥有208个哈希桶即SpanList。

#include "Common.h"

class CentralCache
{
public:
    static CentralCache* GetInstance()
    {
        return &_sInst;
    }
    
    // 获取一个非空的Span
    Span* GetOneSpan(SpanList& list, size_t size);
    
    // thread cache自由链表过长回收内存给central cache
    void ReleaseListToSpan(void* start, size_t size);
    
    // 分配内存给thread cache
    void* FetchRangeObj(void*& start, void*& end, size_t size);
    
private:
    CentralCache(){};
    CentralCache(const CentralCache&) = delete;
    static CentralCache _sInst; // 唯一实例
    SpanList _spanLists[NFREELIST];
};

thread cache向central cache申请内存:

申请内存

1.当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的(不同线程之间会产生竞争),不过这里使用的是一个桶锁(申请不同区间内存大小会去不同的哈希桶中取内存,不同的哈希桶互不影响),尽可能提高效率。

2.central cache映射的spanlist中所有span都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。

3.central cache中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count

当thread cache没有内存而向central cache申请内存时,central cache不可能一次只分配一个对象给thread cache,而应该多分配一些给thread cache,批量地获取对象,提高效率。我们在Common.h中定义一个函数来初步计算分配给thread cache的对象个数:

class SizeClass
{
public:
    static size_t NumMoveSize(size_t size)
    {
        assert(size > 0);
        sizt_t num = size / MAX_BYTES;
        if(num < 2)
            num = 2;
        if(num >= 512)
            num = 512;
        return num;
    }
};

但是我们并不是就按这个计算分配对象的个数,而是采用慢开始反馈调节算法。在FreeList中定义成员变量_maxSize,初始值设为1.MaxSize()函数返回该值。

分配对象的个数batchNum从_maxSize开始,逐渐增长,直到NumMoveSize(size)。

thread cache向central cache申请内存时,二者哈希桶和内存大小的映射规则相同,是到映射的哈希桶即SpanList中找到一个非空的Span,由这个非空的Span为其分配内存,所以我们需要一个获取Span的函数GetOneSpan。但是获取到的Span中的内存不一定够batchNum个。所以实际分配的对象个数还要考虑这一点。

我们定义函数FetchRangeObj从central cache中获取内存:其中需要两个输出型接口start和end来标记获取的内存的起始和结束地址,同时返回实际能够获取到的内存对象个数

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
    Span* it = list.Begin();
    while(it != list.End())
    {
        if(it->_freeList != nullptr)
        {
            return it;
        }
        else
        {
            it = it->_next;
        }
    }
    // 走到这里说明central cache没有内存,要向page cache申请
    // ...

}

size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t size)
{
    size_t index = SizeClass::Index(size);

    _spanList[index]._mtx.lock(); //上桶锁
    Span* span = CentralCache::GetInstance()->GetOneSpan(_spanLists[index], size);
    assert(span);
    assert(span->_freeList);
    
    start = span->_freeList;
    end = start;
    size_t batchNum = SizeClass::NumMoveSize(size);
    size_t actualNum = 1;
    size_t i = 0;
    while(i < batchNum - 1 && NextObj(end) != nullptr)
    {
        end = NextObj(end);
        ++actualNum;
        ++i;
    }
    
    span->_freeList = NextObj(end);
    NextObj(end) = nullptr;
    span->_useCount += actualNum;
    _spanLists[index]._mtx.unlock();
    return actualNum;
}

现在我们就可以完善thread cache向central cache申请内存的函数了: 

//Common.h
class FreeList
{
public:
    // ...
    // 将一段内存头插到自由链表中
    void PushRange(void* start, void* end, size_t n)
    {
        NextObj(end) = _freeList;
        _freeList = start;
        _size += n;
    }
}

//ThreadCache.cpp
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
    // 采用慢开始反馈调节算法
    size_t batchNum = min(SizeClass::NumMoveSize(size), _freeLists[index].MaxSize());
    if(batchNum == _freeLists[index].MaxSize())
    {
        _freeLists[index].MaxSize() += 1;
    }
    void* start = nullptr;
    void* end = nullptr;
    size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, size);
    assert(actualNum > 0);
    if(actualNum == 1)
    {
        assert(start == end);
        return start;
    }
    else
    {
        // 申请到的内存取出一个对象使用
        // 剩下的内存放进thread cache对应的自由链表中,方便下次取用
        _freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
        return start;
    }  
}

释放内存

当thread cache过长或者线程销毁,则会将内存释放回central cache中,释放回来时span要--_useCount。当_useCount减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的页进行合并。

在释放内存时,我们先根据对象大小找到对应的哈希桶_spanLists[index],该哈希桶下挂的是span的链表。申请时我们是从该哈希桶下的某个span申请的,所以还回内存是也应该还给这个span。我们定义一个MapObjectToSpan函数来找到对应的span。这个函数在page cache部分再做详解。 

// ThreadCache.cpp
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
    void* start = nullptr;
    void* end = nullptr;
    list.PopRange(start, end, list.MaxSize());
    
    // 将内存回收给central cache
    CentralCache::GetInstance()->ReleaseListToSpan(start, size);
}

// CentralCache.cpp
void CentralCache::ReleaseListToSpan(void* start, size_t size)
{
    size_t index = SizeClass::Index(size);

    _spanLists[index]._mtx.lock();
    while(start)
    {
        void* next = NextObj(start);
        // 找到对应的span
        // ...
        NextObj(start) = span->_freeList;
        span->_freeList = start;
        --span->_useCount;
        if(span->_useCount == 0)
        {
            // 说明span分配出去的所有对象都还回来了
            // 此时把span回收给page cache
            // ...
        }
        start = next;
    }
    _spanLists[index]._mtx.unlock();
}

 page cache

page cache也是哈希桶结构,但不同的是它采用了直接定址法。存储的内存是以页为单位存储及分配的。central cache没有内存对象向page cache申请时,page cache分配一定数量的page并切割成定长大小的小块内存分配给central cache。

当分配出去的span的所有对象都回收后(_useCount减到0),page cache会回收该span。并将其与相邻页进行合并组成更大的页,缓解内存碎片问题。

// Common.h
static const size_t NPAGES = 129; // page cache一共有1-128页的哈希桶
static const size_t PAGE_SHIFT = 13; //一页8KB

page cache跟central cache相同,只存在一个,同样设计成单例模式。

#include "Common.h"

class PageCache
{
public:
    static PageCache* GetInstance()
    {
        return &_sInst;
    }
    Span* NewSpan(size_t k);
    Span* MapObjectToSpan(void* obj);
    void ReleaseSpanToPageCache(Span* span);
    std::mutex _pageMtx;
private:
    SpanList _spanLists[NPAGES];
    PageCache(){};
    PageCache(const PageCache&) = delete;
    static PageCache _sInst;
}

申请内存

1.当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂成一个4页page span和一个6页page span。

2.如果找到_spanLists[NPAGES - 1]都没有合适的span,则向系统使用mmap、brk、或者是VirtuallAlloc申请128页page span挂载自由链表中,再重复1的过程。

3.需要注意的是central cache和page cache的核心结构都是spanlist的哈希桶,但是它们有本质区别,central cache中哈希桶,是按照thread cache一样的大小对齐关系映射的,它的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache中的spanlist则是按下标桶号映射的,也就是说第i号桶挂的span都是i页内存。

我们知道central cache在没有内存时会向page cache申请,page cache 又是以页为存储单位的,分配给central cache几页呢,我们在Common.h中定义函数NumMovePage。

根据申请的内存对象大小,我们有NumMoveSize计算分配几个对象。所以在page cache中我们就根据NumMoveSize计算出来的值来计算需要分配多少页的内存。

class SizeClass
{
    size_t NumMovePage(size_t size)
    {
        size_t num = NumMoveSize(size);
        size_t nPage = num * size;
        nPage >>= PAGE_SHIFT; // 一共需要多少页
        if(nPage == 0)
            nPage == 1;
        return nPage;
    }
}

central cache内存不够向page cache申请span:

page cache是直接定址法,直接根据申请的页数去对应的哈希桶中获取span;

如果该哈希桶没有span,则向后遍历有没有更大页的内存,如果有将其切分。 

之前我们提到在释放内存时,thread cache将内存回收给central cache应该还给当初申请内存时分配给它内存的span。而我们仅仅能根据要回收内存的起始地址算出该内存的页号,如果要找到对应的span,就需要我们在分配span时建立每个页号和span*的映射关系。因此,我们在page cache新建一个成员变量,用unordered_map结构存储PAGE_ID和Span*的映射关系。(要注意包头文件哦)

// PageCache.h
class PageCache
{
// ...
private:
    std::unordered_map<PAGE_ID, Span*> _idSpanMap;
// ...
}
// PageCache.cpp
Span* PageCache::NewSpan(size_t k)
{
    assert(k < NPAGES);
    if(!_spanLists[k].Empty())
    {
        Span* kSpan = _spanLists[k].PopFront();
        return kSpan;
    }

    for(size_t i = k + 1; i < NPAGES; ++i)
    {
        if(!_spanLists[i].Empty())
        {
            Span* nSpan = _spanLists[i].PopFront();
            Span* kSpan = new Span;
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;
            // 分配出去的内存回收时需要根据地址算出对应的页号
            // 再根据所在页号找到对应的span
            // 因此分配出去的每一页都需要建立和Span*的映射关系
            for(PAGE_ID i = 0; i < kSpan->_n; ++i)
            {
                _idSpanMap[kSpan->_pageId + i] = kSpan;
            }
            
            nSpan->_pageId += k;
            nSpan->_n -= k;
            _spanLists[nSpan->_n].PushFront(nSpan);
            // 切分剩下的nSpan没有分配内存出去,所以不需要每个页号都建立和span的映射关系
            // 因为有合并相邻页的需求,我们只需要建立起始和结束页号和Span*的映射关系即可
            _idSpanMap[nSpan->_pageId] = nSpan;
            _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

            return kSpan;
        }
    }
    // 走到这里说明page cache中也没有内存了,此时只能向堆申请128页的大内存
    Span* bigSpan = new Span;
    void* ptr = SystemAlloc(NPAGES - 1);
    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
    bigSpan->_n = NPAGES - 1;
    _spanLists[bigSpan->_n].PushFront(bigSpan);

    // 递归回去走刚才的逻辑
    return NewSpan(k);
}

函数MapObjectToSpan计算地址的页号并通过页号和Span的映射关系返回页号对应的Span,这里需要注意加锁,因为unorder_map的底层结构是红黑树,当其他地方对_idSpanMap进行写操作时红黑树的结构会发生改变,此时不应该进行读操作:

// PageCache.cpp
Span* PageCache::MapObjectToSpan(void* obj)
{
    PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
    std::unique_lock<std::mutex> lock(_pageMtx);
    auto ret = _idSpanMap.find(id);
    if(ret != _idSpanMap.end())
    {
        return ret->second;
    }
    else
    {
        assert(false);
        return nullptr;
    }
}

释放内存

如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果可以合并就继续向前寻找,直到不能合并为止。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

 那么我们就需要给span新增一个成员变量,来表示该span是否正在被使用(是否空闲),默认情况下span都是空闲的,只有当span分配内存出去时我们才修改_isUse为true。

struct Span
{
	// ...
	bool _isUse = false; // Span是否被使用
};

现在我们先完善一下GetOneSpan的逻辑:

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
    Span* it = list.Begin();
    while(it != list.End())
    {
        if(it->_freeList != nullptr)
        {
            return it;
        }
        else
        {
            it = it->_next;
        }
    }
    // 走到这里说明central cache没有内存,要向page cache申请
    list._mtx.unlock(); // 可以先释放桶锁,方便释放内存

    PageCache::GetInstance()->_pageMtx.lock();
    Span* newSpan = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
    newSpan->_isUse = true;
    newSpan->_objSize = size;
    PageCache::GetInstance()->_pageMtx.unlock();
    
    // 申请到span后要将其按对象大小切分小块内存
    char* start = (char*)(newSpan->_pageId << PAGE_SHIFT);
    size_t bytes = newSapn->_n << PAGE_SHIFT;
    char* end = start + bytes;
    newSapn->_freeList = start;
    start += size;
    void* tail = newSpan->_freeList;
    while(start < end)
    {
        NextObj(tail) = start;
        tail = NextObj(tail);
        start += size;
    }
    NextObj(tail) = nullptr;
    list._mtx.lock();
    list.PushFront(newSpan);
    return newSpan;
}

释放内存

void PageCache::ReleaseSpanToPageCache(Span* span)
{
    // 向前合并
    while(1)
    {
        PAGE_ID prevId = span->_pageId - 1;
        Span* prevSpan = _idSpanMap[prevId]; // ?
        // 前面的页不存在,不合并
        if(prevSpan == nullptr)
        {
            break;
        }
        // 前面的页正在被使用,不合并
        if(prevSpan->_isUse == True)
        {
            break;
        }
        // 前面的页和现在的合并后超过128页,不合并
        if(prevSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        // 合并
        span->_pageId = prevSpan->_pageId;
        span->_n = prevSpan->_n + span->_n;
        _spanLists[prevSpan->_n].Erase(prevSpan);
        delete prevSpan;
    }
    // 向后合并是同样的逻辑
    while(1)
    {
        PAGE_ID nextId = span->_pageId + span->_n;
        Span* nextSpan = _idSpanMap[nextId];
        if(nextSpan == nullptr)
        {
            break;
        }
        if(nextSpan->_isUse == true)
        {
            break;
        }
        if(nextSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        span->_n += nextSpan->_n;
        _spanLists[nextSpan->_n].Erase(nextSpan);
        delete nextSpan;
    }
    _spanLists[span->_n].PushFront(span);
    _span->_isUse = false;
    _idSpanMap[span->_pageId] = span;
    _idSpanMap[span->_pageId + span->_n - 1] = span;
}

完善central cache的ReleaseListToSpan:

void CentralCache::ReleaseListToSpan(void* start, size_t size)
{
    size_t index = SizeClass::Index(size);

    _spanLists[index]._mtx.lock();
    while(start)
    {
        void* next = NextObj(start);
        // 找到对应的span
        // ...
        NextObj(start) = span->_freeList;
        span->_freeList = start;
        --span->_useCount;
        if(span->_useCount == 0)
        {
            // 说明span分配出去的所有对象都还回来了
            // 此时把span回收给page cache
            _spanLists[index].Erase(span);
            span->_freeLists = nullptr;
            span->_prev = nullptr;
            span->_next = nullptr;

            // 将span回收给page cache的过程中可以先把桶锁解除
            // 以免其他线程向central cache释放内存阻塞
            _spanLists[index]._mtx.unlock();
        
            PageCache::GetInstance()->_pageMtx.lock();
            ReleaseSpanToPageCache(span);
            PageCache::GetInstance()->_pageMtx.unlock();
        }
        start = next;
    }
    _spanLists[index]._mtx.unlock();
}

大块内存申请实现

线程申请内存的代码:

ConcurrentAlloc.h:

static void* ConcurrentAlloc(size_t size)
{
    if(pTLSThreadCache == nullptr)
    {
        pTLSThreadCache = new ThreadCache;
    }
    return pTLSThreadCache->Allocate(size);
}

释放内存:

static void ConcurrentFree(void* ptr)
{
    // 首先需要知道释放对象的内存大小
    // 在span结构中有一个成员变量_objSize,代表该span被切分成的小对象的大小
    // 所以我们只需要知道现在释放的对象属于哪个span,即可知道它的大小
    Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
    size_t size = span->_objSize;
    assert(pTLSThreadCache);
    pTLSThreadCache->Deallocate(ptr, size);
}

但是正如最开始所说的,thread cache只能申请<=256KB的内存。所以以上代码也只实现了<=256KB内存的申请。现在我们需要完善大块内存的申请/释放逻辑。

申请内存的第一步是进行内存对齐,我们最开始在RoundUp函数中直接忽略了>=256KB的内存,可以返回去看看,当时我们直接对大块内存的申请情况中assert(false)了。所以现在需要修改,thread cache只能申请<=256KB的内存, 但是page cache中最高却有128页的内存可以申请。所以对于大块内存,我们直接向page cache进行申请,所以在内存对齐这一步我们需要按页对齐:

class SizeClass
{
public:
    static inline size_t RoundUp(size_t size)
    {
        if(size <= 128)
            return _RoundUp(size, 8);
        else if(size <= 1024)
            return _RoundUp(size, 16);
        else if(size <= 8 * 1024)
            return _RoundUp(size, 128);
        else if(size <= 64 * 1024)
            return _RoundUp(size, 1024);
        else if(size <= 256 * 1024)
            return _RoundUp(size, 8 * 1024);
        else
        {
            // assert(false);
            // return -1;
            return _RoundUp(size, 1 << PAGE_SHIFT);
        }
    }
}

ConcurrentAlloc和ConcurrentFree也做出相应的修改:

static void* ConcurrentAllloc(size_t size)
{
    if(size > MAX_BYTES)
    {
        size_t alignSize = SizeCalss::RoundUp(size);
        size_t kPage = alignSize >> PAGE_SHIFT;
        
        PageCache::GetInstance()->_pageMtx.lock();
        Span* span = PageCache::GetInstance()->NewSpan(kPage);
        span->_objSize = size;
        PageCache::GetInstance()->_pageMtx.unlock();
        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        return ptr;
    }
    else
    {
        if(pTLSThreadCache == nullptr)
        {
            pTLSThreadCache = new ThreadCache;
        }
        return pTLSThreadCache->Allocate(size);
    }
}

static void ConcurrentFree(void* ptr)
{
    Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
    size_t size = span->_objSize;
    if(size > MAX_BYTES)
    {
        PageCache::GetInstance()->_pageMtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->_pageMtx.unlock();
    }
    else
    {
        assert(pTLSThreadCache);
        pTLSThreadCache->Deallocate(ptr, size);
    }
}

 但是page cache中也只有<=128页的内存,如果我们申请大于128页的内存就没办法问page cache要了,只能找堆:

Sapn* PageCache::NewSpan(size_t k)
{
    assert(k > 0);
    if(k > NPAGES - 1)
    {
        void* ptr = SystemAlloc(k);
        Span* span = new Span;
        span->_pageId = ptr >> PAGE_SHIFT;
        span->_n = k;

        _idSpanMap[span->_pageId] = span;
        return span;
    }
    // ...
}

释放时内存如果大于128页我们应该还给堆,而不是给page cache,我们直接向堆申请内存时调用的是VirtualAlloc,与之对应的释放内存的是VirtualFree:

// Common.h
void SystemFree(void* ptr)
{
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);
#else
    // ...
#endif
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
    if(span->_n > NPAGES - 1)
    {
        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        SystemFree(ptr);
        delete span;
        return;
    }
    // ...
}

定长内存池配合脱离使用new

        既然我们这个项目是为了在高并发情况下实现比malloc更高效地处理内存的申请释放,那我们在实现中再用到malloc就不合适了。现在我们用定长内存池来进行替换。

项目中涉及到new和delete的操作主要是pagecache中对span进行的操作,我们在page cache中新增一个定长内存池的成员变量:

class PageCache
{
public:
	// ...
private:
	ObjectPool<Span> _spanPool;
	// ...
};

然后再将我们代码中的new和delete全部改为调用定长内存池中的New和Delete函数:

// Span* span = new Span;
Span* span = _spanPool.New();

//delete span;
_spanPool.Delete(span);		

 同时,ConcurrentAlloc中的new ThreadCache也需要替换:

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES)
	{
		// ...
	}
	else
	{
		if (pTLSThreadCache == nullptr)
		{
			ObjectPool<ThreadCache> tcPool;
			pTLSThreadCache = tcPool.New();
			//pTLSThreadCache = new ThreadCache;
		}
		return pTLSThreadCache->Allocate(size);
	}
}

这里为了防止线程从定长内存池中申请内存产生竞争,我们以空间换时间,每个线程都有一个定长内存池tcPool。

性能测试

以下是一个简单的对比malloc和高并发内存池效率的代码:

nworks -- 线程个数

rounds -- 轮次

ntimes -- 每轮次申请和释放内存的个数

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(malloc(16));
					v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:",
		nworks, rounds, ntimes);
	cout << malloc_costtime << "ms" << endl;

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:",
		nworks, rounds, ntimes);
	cout << free_costtime << "ms" << endl;

	printf("%u个线程并发malloc&free %u次,总计花费:",
		nworks, nworks * rounds * ntimes);
	cout << malloc_costtime + free_costtime << "ms" << endl;
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(ConcurrentAlloc(16));
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:",
		nworks, rounds, ntimes);
	cout << malloc_costtime << "ms" << endl;

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:",
		nworks, rounds, ntimes);
	cout << free_costtime << "ms" << endl;

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:",
		nworks, nworks * rounds * ntimes);
	cout << malloc_costtime + free_costtime << "ms" << endl;
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" << endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

然而运行后我们发现很多时候我们的程序性能上还是比不过malloc。这个时候我们可以利用性能分析工具来查看性能瓶颈。

 可以看到page cache中的MapObjectToSpan函数和thread cache中的Deallocate函数对性能影响比较大,我们可以点进去查看具体情况:

很明显问题出在了加锁上。 

tcmalloc源码实现基数树优化

        我们已经知道问题在于MapObjectToSpan加锁导致效率较低。在上文中我们提到了MapObjectToSpan需要注意加锁是因为unordered_map底层结构是红黑树,当其他地方对_idSpanMap进行写操作时红黑树的结构会发生改变,此时不应该进行读操作。而tcmalloc中_idSpanMap的结构也并没有采用红黑树:

TCMalloc_PageMap1是采用直接定址法存储PAGE_ID和Span*映射关系的相当于一维数组的结构。里面对应页号的下标位置中存储的都是其映射的void*(Span*)。而该结构非常稳定,一开始开辟好空间后结构不会发生变化,与红黑树不同。因此在其他线程写的时候也不妨碍另一个线程读,这样MapObjectToSpan内部就不用加锁:

        其中的set函数就是建立页号与Span*的映射关系;而get函数则是通过页号找到对应的Span。

        BITS是存储页号需要的位数,比如一页为8KB的情况下在32位下总共会有2^32/2^13=2^19页,也就是说这是存储页号需要19位。

而TCMalloc_PageMap2也是基本相同的逻辑,只是采用了两层映射,先通过页号的前五位映射到第一层基数树,再通过剩下的几位映射到第二层基数树。这样我们不至于一下就把sizeof(void*)<<BITS对齐后的空间全部开辟出来,而是可以先把第一层空间开好,等到用到对应的第二层后再开辟相应的空间,一定程度上减少了内存的消耗。

然而TCMalloc_PageMap1和TCMalloc_PageMap2都只适用于32位,在32位下BITS=32-13=19,它们最终需要开辟的内存空间都是4*2^19=2MB。内存消耗不算很大。但是在64位下BITS=64-13=51。需要8*2^51=2^24GB,内存消耗太大。这时就用TCMalloc_PageMap3实现,采用三层映射,节省内存的原理与TCMalloc_PageMap2相同。

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap1(void* (allocator)(size_t)) {
		*array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*)
			<< BITS));
		memset(array_, 0, sizeof(void*) << BITS);
	}
	// Return the current value for KEY. Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}
	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;
	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;
	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};
	Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
	void* (*allocator_)(size_t); // Memory allocator
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		memset(root_, 0, sizeof(root_));
	}
	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}
	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}
	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;
			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;
			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>
					((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			// Advance key past whatever is covered by this leaf
			node
				key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}
	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;
	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};
	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};
	Node* root_; // Root of radix tree
	void* (*allocator_)(size_t); // Memory allocator
	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)
			(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}
	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2]
			== NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2]) -
> values[i3];
	}
	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]
			= v;
	}
	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) &
				(INTERIOR_LENGTH - 1);
			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;
			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}
			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>
					((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(leaf));
				*root_->ptrs[i1]->ptrs[i2] =
					reinterpret_cast<Node*>(leaf);
			}
			// Advance key past whatever is covered by this leaf
			node
				key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}
	void PreallocateMoreMemory() {
	}
};

我们利用tcmalloc的源码加以修改运用到项目上:

template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap1() {
		size_t size = sizeof(void*) << BITS;
        size_t alignSize = _RoundUp(size, 1 << PAGE_SHIFT);
        array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}
	void set(Number k, void* v) {
		array_[k] = v;
	}
};

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;
	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;
	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};
	Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap2() {
		memset(root_, 0, sizeof(root_));
        PreallocateMoreMemory();
	}
	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}
	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}
	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;
			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;
			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
                static ObjectPool<Leaf> leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			// Advance key past whatever is covered by this leaf
			node
				key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}
	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

运行程序,效率提升很大: 

 项目源码:ConcurrentMemoryPool · 亮亮儿/Project - 码云 - 开源中国 (gitee.com)


网站公告

今日签到

点亮在社区的每一天
去签到