1 Star 3 Fork 0

maole/高并发内存池

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

项目介绍

该项目是实现一个高并发的内存池

用到的知识

C/C++,数据结构(链表、哈希桶),操作系统内存管理,单例模式,多线程,互斥锁等。

内存池

内存池是指程序预先从操作系统中申请一大块内存,然后自己管理这部分内存。
当程序需要申请内存的时候,不是直接向操作系统申请,而是向内存池中申请内存;
释放的时候,也不是把内存给操作系统,而是放入内存池中。

内存池主要解决的问题

  1. 效率问题
  2. 内存碎片化问题

定长内存池的实现

方案

我们先申请一个一定大小的内存空间,该空间用来承担小型池子的作用。
每次程序申请空间的时候,就从该小池子申请空间。
当程序释放资源空间的时候,就返回到“池子”中。
但是真的可以回到池子中吗?——不能
因为我们这个池子是连续的一段空间,每个程序来的时候,都向池子申请资源,但还资源的时候,我们无法判断,它是不是按照申请的顺序返回的,所以我们不能进入池子。
那我们进入哪里呢?我们可以用一个指针接收返回的空间,每个空间都有它的地址,我们把这些空间一个一个连接起来(用单链表的形式)。当再有其他程序来申请的空间的时候,直接从该链表资源中取该资源。

实现

该定长内存池,我们给它起名为ObjectPool——类
根据我们刚才的思路,至少需要2个成员。

  1. _memory——定长内存池指针
  2. _freeList——释放资源的单链表指针

还需要一个成员记录剩余空间的大小freeSpaceSize

该类要有申请空间和释放空间的能力,那么就需要这2个能力。

  1. New()——申请空间
  2. Delete()——释放空间

细节问题

成员

_memory的是什么类型的指针呢?——char*类型的

为什么是char类型的呢? 当申请内存空间的时候,就要算新空间的首地址,那么char类型的指针,加一就是加一个字节,非常方便计算。其他的类型就不是很方便。

_freeList是类型的指针呢?——void*,不需要知道该指针是什么类型的,设成void*就可以。
_freeList指向释放资源的地址,它们是怎么连接的呢?
每个释放的空间相当于一个结点,前4/8字节存储下一个结点的地址,那么最后一个空间存储的就是nullptr

char* _memory=nullptr;//定长内存空间的地址
void* _freeList=nullptr;//释放资源的链表指针
int _freeSpaceSize=0;//剩余空间的大小

方法

New方法

返回对象的地址即可——满足申请空间返回该空间的地址。 如果资源链表中有资源,直接从资源链表中拿资源即可——拿的方法为头删。

if (_freeList)
{
    //头删
    obj = (T*)_freeList;
    _freeList = *((void**)_freeList);
}

上面有一个*((void**)_freeList),为什么要这样写呢?
因为我们也不知道平台是32位的平台,还是64位平台的。所以就先把_freeList类型转成void**类型的。
void**类型的大小肯定是4或者8,解引用就是拿到该内存的前4,8个字节。当然用int**``char**也是可以的,可不可以用void*的呢?——不能,void*类型不能解引用。

如果资源链表中没有资源,那么我们就要从池子里面去拿资源。 要拿池子中的资源,就有两者情况:

  1. 池子中有资源,直接拿
  2. 池子中没有资源,重新向内存中进行申请

判断条件为剩余空间小于要申请的大小

有资源之后就是要分配资源,分配资源的时候要注意:申请的资源是否大于地址的大小 为什么要注意这个呢?

  • 因为该资源释放的时候,会进入释放资源链表,该资源要存储下一个资源的地址,如果空间小就存储不下该地址。

所以,当小于地址空间的时候,就要给它一个地址空间的大小。

把空间分配之后,要对剩余空间的地址和大小进行更新

因为模板可能是自定义类型,那么我们就要用定位new进行初始化资源。

T* New()
{
    T* obj = nullptr;
    //资源链表中有资源
    if (_freeList)
    {
        //头删
        obj = (T*)_freeList;
        _freeList = *((void**)_freeList);//保证32,64平台都是可以的
    }
    else
    {
        //剩余空间不够,重新申请
        if (_freeSpaceSize < sizeof T)
        {
            _freeSpaceSize = 128 * 1024;
            _memory = (char*)malloc(_freeSpaceSize);
            if (_memory==nullptr)
                throw std::bad_alloc();
        }
        //分配资源
        obj = (T*)_memory;
        int objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//空间是否可以存储一个地址的大小
        _memory += objSize;//更新空间地址
        _freeSpaceSize -= objSize;//更新所剩空间的大小
    }
    //定位new,解决初始化问题
    new(obj)T;

    return obj;
}

Delete方法

对申请资源进行释放,释放就是进入释放链表——头插(资源空间存储头结点的地址,_freeList指向新的资源地址) 释放前要调用对象的析构函数,释放空间不需要改变剩余空间的大小,因为申请空间时,优先从链表资源中取资源。

void Delete(T* obj)
{
    //显式调用析构函数
    obj->~T();
    //头插
    *((void**)obj) = _freeList;
    _freeList = obj;
}

高并发内存池的整体架构

image.png
concurrent memory pool(高并发内存池)主要有3个结构:

  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存分配,线程从这里申请内存时是不需要加锁的,每个线程独享一个cache。——如何独享?下面有介绍
  2. central cache:中心缓存是所以线程共享的,当thread cache中无内存可用的时候,thread cache就要从central cache中申请内存,同时central cache也会在合适的时候从thread 擦车中回收内存,避免一个线程占太多的内存,导致其他线程无法使用。从上面的描述可以看出,central cache是需要竞争的资源,所以从这里取内存对象的时候是需要进行加锁的。`
  3. page cache:页缓存是在central cache1缓存上面的缓存,存储的内容是以页我得单位存储及分配的,central cache没有内存时,从page cache中分配一段数量的page,并切割定长大小的小块内存,然后分配给central cache。在一定的时候,多个页的对象会page cache进行合并回收,组成一个更大的页,来缓解内存碎片的问题。

thread cache的实现

thread cache是一个哈希桶的结构,每个桶挂着一个自由链表。每个线程都有一个thread cache对象,这样每个线程从这里获取对象和释放对象时是无锁的。
image.png
怎么保证每个线程独享一个thread cache呢?——Thread Local Storage(线程局部存储)TLS

Windows上面这样设置就可以保证:
static __declspec(thread) ThreadCache* ptls_threadcache = nullptr;
这里的ptls_threadcache就是指向thread cache

static void* concurrentalloc(size_t size)
{
	if (ptls_threadcache == nullptr)
		ptls_threadcache = new ThreadCache;
	return ptls_threadcache->Allocate(size);
}

方法

线程缓存至少要完成2个功能:申请资源,释放资源。

申请——Allocate

如同malloc一样,它也需要知道需要多大的内存空间,同时要返回该内存空间的地址。
所以该函数为void* Allocte(size_t size)
如果内存的申请小于等于256KB,我们要先通过申请内存的大小判断是那个桶,找到这个桶之后,要判断该桶有没有空间,如果没有就去central cache中去申请,如果有直接头删返回就行。
细节——如何使内存对齐,如果找到桶:

  • 内存对齐

内存对齐,我们用下面的方法:

[1,128] 8byte对齐 freelist[0,16)
[128+1,1024] 16byte对齐 freelist[16,72)
[1024+1,8*1024] 128byte对齐 freelist[72,128)
[81024+1,641024] 1024byte对齐 freelist[128,184)
[641024+1,2561024] 8*1024byte对齐 freelist[184,208)

为什么要向上面那样内存对齐呢?是为了保证总体的内存碎片率大概处于10%左右。
从上面也就确定了桶的个数208个桶。
对齐之后的内存是多少?——如果需要的内存空间刚好是对齐数的整数倍,那么就不需要进行处理;如果不是整数倍,只需向上调整到整数倍即可size - size % alignNum + alignNum;
当然可以用位运算来更高效的处理这个问题,因为对齐是频繁调用的,cpu对位操作处理的非常快,同时把函数搞成内联函数加快处理速度。

static inline size_t _alignSize(size_t size, size_t alignNum)
{
    if (size % alignNum == 0)
    {
        return size;
    }
    else
        return size - size % alignNum + alignNum;
}
static inline size_t alignSize(size_t size)
{
    assert(size <= MAX_BYTE);
    if (size <= 128)
    {
        return _alignSize(size, 8);
    }
    else if (size <= 1024)
    {
        return _alignSize(size, 16);

    }
    else if (size <= 8 * 1024)
    {
        return _alignSize(size, 128);

    }
    else if (size <= 64 * 1024)
    {
        return _alignSize(size, 1024);

    }
    else if (size <= 256 * 1024)
    {
        return _alignSize(size, 8 * 1024);

    }
    else
    {
        assert(false);
        return -1;
    }
}
  • 找桶

如何找是哪个桶呢?

因为每个区间的字节对齐数不一样,所以我们首先要去除前一个区间的最大字节。然后用剩下的字节去计算桶数再加上上一个区间的桶数即可。

static inline size_t _findBucket(size_t size,size_t align)
{
    if (size % align == 0)
    {
        return size / align - 1;
    }
    return size / align;
}
static inline size_t findBucket(size_t size)
{
    //每个区间的最大桶数
    static size_t sectionBucketNum[] = { 16,72,128,184 };

    if (size <= 128)
    {
        return _findBucket(size, 8);
    }
    else if (size <= 1024)
    {
        return _findBucket(size - 128, 16) + sectionBucketNum[0];

    }
    else if (size <= 8 * 1024)
    {
        return _findBucket(size - 1024, 128) + sectionBucketNum[1];

    }
    else if (size <= 64 * 1024)
    {
        return _findBucket(size - 8 * 1024, 1024) + sectionBucketNum[2];

    }
    else if (size <= 256 * 1024)
    {
        return _findBucket(size - 64 * 1024, 8 * 1024) + sectionBucketNum[3];

    }
    else
    {
        assert(false);
        return -1;
    }
}

桶中没有空间,向central cache申请空间——void* FetchFromCentralCache(size_t index, size_t size);
这里的index是对于桶的下标,size是已经对齐之后的空间的大小。
在向central cache申请内存时,就要考虑如何给它内存,因为我们知道它每次申请的都是一块空间(有小空间,如8字节,有大空间256K),那么我们如何分配呢?——慢开始算法

我们用最大的空间MAX_BYTE除以需求空间的大小,得到一个数字。 如果需求的空间小,那么这个数字很大;反之,这个数很小。 所以我们对得到的这个数进行处理:

  • 小于2:说明请求的空间大,那么我们就给它2个
  • 大于512:说请求的空间少,那么就很512
  • 其他的情况,就按照所求的哪个数分配
static size_t SpaceNeedSize(size_t size)
{
    assert(size > 0);

    size_t num = MAX_BYTE / size;
    if (num < 2)
        num = 2;

    if (num > 512)
        num = 512;

    return num;
}

上面是分配多少个内存的问题,内部是怎么分配的我们教给central cache去处理。
(保证一定是申请成功的)有可能span不够我们申请的个数,那么申请的空间就会比我们申请的少。
如果就申请了一个(说明span中就只有1个,自由链表就一个对象,就不需要对自由链表后进行修复),就直接返回申请的地址。
如果不是1个,那么我们就要对自由链表进行修复处理——保持链表的正常连接。

void* ThreadCache:: FetchFromCentralCache(size_t index, size_t size)
{
	//慢开始算法
	size_t spacenum = min(_hashBucket[index].GetNeedNum(), SpaceNeedSize(size));

	//申请相同空间时,慢慢增长
	if (_hashBucket[index].GetNeedNum() == spacenum)
		_hashBucket[index].GetNeedNum() += 2;

	void* start = nullptr;
	void* end = nullptr;

	size_t actualnum = CentralCache::GetInstance()->FetchRangeObj(start, end, spacenum, size);
	
	//断言,申请成功一定会满足下面的条件
	assert(actualnum >= 1);
	assert(start);
	assert(end);

	if (actualnum == 1)
	{
		assert(start == end);
		return start;
	}
	else
	{
		//第一个参数是第二个内存,第一块对象是被申请走了,只需要把剩下的连接就可以
		_hashBucket[index].PushRange(NextObj(start) , end);
        return start;
	}
}

保持链表的正常连接

void PushRange(void* start,void* end)
{
    NextObj(end) = _freeList;
    _freeList = start;
}

释放——Deallocate

释放的时候要说明一下释放空间的地址和该空间的大小——参数要注意的
当释放内存小于256KB时,将内存释放回thread cache对应的桶中(先找到对应的桶)——头插。
当链表过长的时候,则回收一部分内存对象到central cache中。
如何把内存释放到central cache中呢?
我们可以设置一个成员变量记录每个桶中链表结点的个数,当结果个数大于每次向central cache中申请的个数的时候,就释放申请个数个结点到central cache中。记得要保证链表的连接哦。

所以,我们先在_freeList中添加一个成员_nodeNum用来记录thread cache桶中链表的长度。

void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTE);
	
	size_t index = AlignFindbucket::findBucket(size);
	_hashBucket[index].push(ptr);

	if (_hashBucket[index].NodeNum() >= _hashBucket[index].GetNeedNum())
	{
		ListTooLong(_hashBucket[index], size);
	}

}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	//从链表中删除
	list.PopRange(start, end, list.GetNeedNum());

	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

成员

需要什么成员呢?当然是需要一个哈希桶
那么我们进行封装一下,怎么封装呢?
只需要把自由链表封装一下就可以了。

自由链表

成员就是链表的头指针,
该结构要有头插、头删、判空的能力。

  • 头插——push

相当于回收资源 要传入资源地址——void push(void* ptr)

  • 头删——pop

相当于申请资源 要返回删除结点的地址——void* pop()

  • 判空——empty

就是判断链表是否为空

central cache的实现

central cache的结构也是一个哈希桶的结构,和thread cache的映射关系是一样的。
不同的是central cache的每个桶的对象是一个span(多个页的一个跨度),每个span相连成一个链表,每个span又独自挂一个自由链表,该自由链表是对span申请空间的一个分割。
thread cache是每个线程独有的,其他线程拿不到该资源,所以就需要进行上锁。
但是central cache是进程享有的,每次进程有多个线程,那么我们就需要对central cache进行加锁——因为是多个线程所看到的公共资源。
为了高效,我们不是对整个central cache进行加锁,而是对每个桶进行加锁。
为什么要这样加锁呢?

结构、需求决定着我们要这样进行操作: 每个桶用于分配不同的内存空间,当各个线程需要的内存空间不在同一个桶的时候,就不需要竞争资源,就可以更高效。

既然每个进程有一个central cache,那么就没有用单例模式,这里我们可以用饿汉模式
image.png

成员

该结构的成员就是哈希桶,每个桶存储的是一个带头双向链表结构
其次还要有CentralCache对象,用来实现单例模式,

Span

带头双向循环链表的每个结点的元素就是一个span——多个页的一个跨度

那么span的有什么属性呢?

起始的页号:_pagaId

  • 关于这个起始页号,32位和64位下页的总数是不一样的。我们可以通过调节编译的方法来解决这个问题。

页的数量:_pageNum

还要具有双向链表的结构:

  • _next
  • _prev

挂自由链表的结构:_freeList

分配给thread cache的个数:_useCount

SpanList

span封装完成之后(相当于完成了元素结点),下面完成带头双向循环链表SpanList
对于带头的双向链表,最基本的功能就是满足插入和删除的工作。
SpanList是每个桶所管理的资源,根据上述描述,要进行加锁操作。
所以,有两个成员:

Span* _head;
//每个桶都需要加锁操作
std::mutex _lock;

初始化:

SpanList()
{
    _head = new Span;
    _head->_next = _head;
    _head->_prev = _head;
}

插入:

//在当前结点之前插
void Insert(Span* pos,Span* newspan)
{
    assert(pos);
    assert(newspan);

    Span* prev = pos->_prev;
    newspan->_next = pos;
    pos->_prev = newspan;

    prev->_next = newspan;
    newspan->_prev = prev;

}

删除:

//删除的当前结点
void Erase(Span* pos)
{
    assert(pos);
    //不能删除头结点
    assert(_head);

    Span* prev = pos->_prev;
    Span* next = pos->_next;

    prev->_next = next;
    next->_prev = prev;
}

方法

因为是单例模式,所以构造函数要私有,拷贝构造和赋值构造要禁掉。

class CentralCache
{
public:

private:
	//单例模式,构造私有,拷贝、赋值禁掉
	CentralCache() {}
	CentralCache(const CentralCache& obj) = delete;
	CentralCache& operator=(CentralCache& obj) = delete;

	SpanList _spanlist[BUCKET_NUM];
	//单例模式——饿汉模式
	static CentralCache _centralCache;
};

既然是单例模式,那么我们就需要获得这个实例

static CentralCache* GetInstance()
{
    return &_centralCache;
}

申请内存

size_t FetchRangeObj(void*& start, void*& end, size_t n, size_t byte_size);
对于申请内存一定要上锁。
我们通过需要空间大小,确定是哪个桶,然后去找span,如果span没有内存,就需要向怕个 cache去申请空间;如果有内存,那么就可以给thread cache,但是如果span里面的内存个数小于申请的个数,那么span有多少就给它多少。
给了之后,要重新把自由链表进行连接上,同时要记录分配给thread cache多少空间。
上面的操作,我们就是直接遍历自由链表即可,同时要注意当链表为空时,就停止,记录申请的个即可。
注意:
申请完成资源之后,要保证申请走资源的最后一个资源指向的下一个资源为空。

size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t byte_size)
{
	size_t index = AlignFindbucket::findBucket(byte_size);
	//加锁
	_spanlist[index]._lock.lock();

	Span* span = GetOneSpan(_spanlist[index], byte_size);

	assert(span);
	assert(span->_freeList);

	size_t actualNum = 1;//实际申请到的个数
	size_t i = 1;
	
	//start就是申请到资源的地址
	start = span->_freeList;
	end = start;
	//找到最后一个资源的地址
	while (i<n&&NextObj(end)!=nullptr)
	{
		end = NextObj(end);
		++i;
		++actualNum;
	}
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;

	//解锁
	_spanlist[index]._lock.unlock();
	return actualNum;
}

下面完成GetOneSpan
GetOneSpan是用来获取到我们需要的一个span空间,
用以下步骤获得:

  1. 先从central cache中进行找,如果给有空间,就直接返回即可,如果没有,进入步骤2
  2. 再去page cache中去找

步骤1:遍历这个双向循环链表即可,只要_freeList不为空的,就是我们需要的。这里需要遍历我们可以在SpanList中添加获取链表头指针和尾指针的方法。

Span* Begin()
{
    return _head->_next;
}
Span* End()
{
    return _head->_prev;
}

步骤2:去pagecache中去申请。首先我们是确定需要多少个这些内存,然后算出总的内存大小,最后通过总的内存大小计算出需要几页。

static size_t SpaceNeedSize(size_t size)
{
	assert(size > 0);

	size_t num = MAX_BYTE / size;
	if (num < 2)
		num = 2;

	if (num > 512)
		num = 512;

	return num;
}
//需要页的个数
static size_t PageNeedSize(size_t size)
{
	size_t num = SpaceNeedSize(size);
	size_t npage = num * size;

	//页的个数
	npage >>= PAGE_SIZE;
	if (npage == 0)
		npage = 1;

	return npage;
}

page cache获得的span之后,要进行切分——因为page cachespan是整块内存。
要切分就要知道改块内存的起始地址,还有总的自己个数,还有末尾地址。切的时候就是把每一块所需大小(size)的空间挂到_freeList上面。为了保证在物理上也是连续的地址,我们用尾插的方法。
image.png
分割代码:

//central cache桶中没有,向page cache中去申请
Span* newspan = PageCache::GetInstance()->NewSpan(PageNeedSize(byte_size));

//对从page cache获得的span进行切分
//char*的方便计算
char* start = (char*)(newspan->_pageId << PAGE_SIZE);//起始地址
size_t num = newspan->_pageNum << PAGE_SIZE;//有多少字节
char* end = start + num;//结尾地址

newspan->_freeList = start;
void* tail = start;

size_t i = 1;//分割的个数
while (start < end)
{
    start += byte_size;
    NextObj(tail) = start;
    ++i;
    tail = NextObj(tail);

}
//分割完成,插入到central cache中

list.PushFront(newspan);

下面我们思考一下锁的逻辑:

当我们进去GetOneSpan的时候,是带桶锁进入的。如果是不同的进程进入同一个桶,是安全的;如果不是同一个桶,那么肯定申请空间就需要竞争,因为central cache是桶锁,那么我们就要加上page cache锁。 我们思考一下,在调用NewSpan去page cache申请内存之前,要不要解锁呢?——需要。如果不解锁的话,在其他线程到该桶归还资源的时候,无法进行资源的归还,而且申请内存也是需要一段时间的。

让我们继续思考:

上面我们已经把桶锁解开了,那么在进行span分割的时候,需不需要进行加桶锁呢? 因为我们申请到的这块内存空间,只有该线程可见(就算其他线程进入page cache去申请,堆区的这个内存也已经被分配出去了),其他线程看不到。所以就不需要加桶锁。

最后一个思考:

分割完成之后,插入到对应桶的链表的时候,需要加锁吗? 需要的,因为这时,多个线程可以看到同一个桶,插入的时候,必须加锁。

整个的逻辑如下:

Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
	//先遍历Central cache
	Span* it = list.Begin();
	while (it != list.End())
	{
		if (it->_freeList != nullptr)
			return it;
		it = it->_next;
	}
	//开锁
	list._lock.unlock();

	//central cache桶中没有,向page cache中去申请,需要加锁
	PageCache::GetInstance()->_pageMtx.lock();
	Span* newspan = PageCache::GetInstance()->NewSpan(PageNeedSize(byte_size));
	PageCache::GetInstance()->_pageMtx.unlock();
	
	//对从page cache获得的span进行切分
	//char*的方便计算

	char* start = (char*)(newspan->_pageId << PAGE_SIZE);//起始地址
	size_t num = newspan->_pageNum << PAGE_SIZE;//有多少字节
	char* end = start + num;//结尾地址

	newspan->_freeList = start;
	void* tail = start;

	size_t i = 1;//分割的个数
	while (start < end)
	{
		start += byte_size;
		NextObj(tail) = start;
		++i;
		tail = NextObj(tail);

	}
	//分割完成,插入到central cache中,加锁
	list._lock.lock();
	list.PushFront(newspan);

	return newspan;
}

释放内存

central cache要对来自thread cache中释放的内存连接到central cache对应桶的span上的_freeList的链表上。当span上挂的结点都回来的时候,我们再把span的空间释放给page cache。注意锁的问题

  • 如何连接到span上面

首先thread cache上面的结点的地址都是随机的,所有我们要变量返回的链表,一个结点一个结点的进行判断是哪个span,找到span之后,我们就把结点头插上去

如果span上面的结点都回来的时候,我们就要把该span释放到page cache上面

那么我们现在要结点的问题就是如何通过结点的地址找到span。
这个我们用unordered_map,建立页号和span直接的对应关系。
这种映射关系应在分配内存的时候,就应该进行映射,所有之前的代码要修改一下。

//对分割出来的kspan的每一个页号都和kspan对应
for (int i = 0; i < kspan->_pageNum; i++)
{
    _idSpanMap[kspan->_pageId + i] = kspan;
}

同时添加一个接口,可以通过id获得span。

Span* PageCache::MapObjectToSpan(void* obj)
{
	assert(obj);
	PAGE_ID page_id = (PAGE_ID)obj >> PAGE_SIZE;
	auto ret = _idSpanMap.find(page_id);
	if (ret != _idSpanMap.end())
		return ret->second;
	else
	{
		assert(false);
		return nullptr;
	}
}

在释放到page cache的时候(解开桶锁),先把span从central cache中删除,然后进入page cache的回收逻辑,同时把span中的部分信息清空。

void CentralCache::ReleaseListToSpans(void* start, size_t byte_size)
{
	assert(start);
	size_t index = AlignFindbucket::findBucket(byte_size);
	_spanlist[index]._lock.lock();
	while (start)
	{
		void* next = NextObj(start);
		//查找到对应的span
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		NextObj(start) = span->_freeList;
		span->_freeList = start;
		span->_useCount--;

		if (span->_useCount == 0)//给thread cache的内存都回来了,释放给page cache
		{
			_spanlist[index].Erase(span);
			span->_freeList = nullptr;
			span->_next = nullptr;
			span->_prev = nullptr;
			
			_spanlist[index]._lock.unlock();

			//回收到page cache
			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanlist[index]._lock.lock();

		}
		start = next;
		
	}

	_spanlist[index]._lock.unlock();
}

PageCache

page cache的存储结构也是哈希桶的结构,不过,它是直接进行映射的。
结构如下:和central cache不一样的是,page cache的每个桶挂着的是带头的双向循环链表,对于span,都没有再挂自由链表,这里的span都是一个大的内存。
image.png
同样,pageCache也是单例的,因为每个central cache只有一个pagecache。
同时,pagecache也是需要上锁的——因为每个线程在centralcache的桶中没有找到资源的时候,就会向pagecache申请,就会导致竞争同一个资源,那么就要上锁。

成员

哈希桶,每个桶挂着一个带头的双向循环链表span。
桶的个数为128个,我们用的是直接地址映射——1号桶挂1个页,2号桶挂2页……123号桶挂128页。那么对于我们需求的最大内存256KB来说,128号桶的一个span为1024KB,够分成4份了。(0号桶不算的话,就需要129个)
其次还需要一个锁。

class PageCache
{
public:
	//获得实例
	static PageCache* GetInstance()
	{
		return &_pageCache;
	}

private:
	PageCache(){}
	PageCache(const PageCache& obj) = delete;
	PageCache& operator=(PageCache& obj) = delete;
private:
	SpanList _pageList[PAGE_NUM];
	std::mutex _pageMtx;
	//单例
	static PageCache _pageCache;
};

申请空间

central cachepage cache申请空间的时候,page cache会先在自己的空间上进行查找,
查找过程:

  1. 如果当前页有内存,就直接分配
  2. 当前页没有内存,去下一页去找(遍历所有的页),如果有,进行页的切割,需要的页返回,剩下的页放到对应的桶中连接上去。
  3. 把所有的桶都遍历完了,都没有内存,就只能向堆区要内存。既然向堆区进行要内存了,那么我们一下子要应该128页的span,然后挂到central cache上面,然后再调用这个函数,用来分割这个申请的内存。这里虽然是用递归调用自己,但是最多才遍历128次,这个此时对于计算机来说是小意思。
Span* PageCache::NewSpan(size_t k)
{
	//申请的页数要正常
	assert(k > 0 && k < PAGE_NUM);
	//当前页有,直接分配
	if (!_pageList[k].Empty())
		return _pageList[k].PopFront();

	//当前页没有,去找其他页
	for (size_t i = k + 1; i < PAGE_NUM; i++)
	{
		if (!_pageList[i].Empty())
		{
			//切割这个页
			Span* spans = _pageList[i].PopFront();
			
			Span* kspan = new Span;
			kspan->_pageId = spans->_pageId;
			kspan->_pageNum = k;

			spans->_pageId += k;
			spans->_pageNum -= k;

			_pageList[spans->_pageNum].PushFront(spans);

			return kspan;
		}
	}
	//当前页没有,向堆区进行申请
	//一次申请很大的一个页
	Span* newspan = new Span;

	void* ptr= SystemAlloc(PAGE_NUM -1);
	//更新页的属性
	newspan->_pageId = (PAGE_ID)ptr >> PAGE_SIZE;
	newspan->_pageNum = PAGE_NUM - 1;
	_pageList[PAGE_NUM - 1].PushFront(newspan);

	//回调自己
	return NewSpan(k);
}

释放空间

central cache中释放的span空间到了page cache中,那么我们就要对span挂到对应的桶上面。
在挂之前,我们要先对span进行合并(前后页的span进行合并)。
page cache中,我们还是用unordered_map(PAGE_ID,span*),不过和central cache不同的是(需要切割span,切成一个一个的小内存,所有每个页号都需要进行映射),page cache不需要每个页每个页的进行映射,只需要首页好、尾页号和span进行映射即可,因为page cache中的大块内存不需要再分割成一个一个的小内存。——具体的映射关系阅读申请的逻辑代码。

//为了page cache合并大空间进行映射准备
_idSpanMap[spans->_pageId] = spans;
_idSpanMap[spans->_pageId + spans->_pageNum - 1] = spans;

因为central cache(使用中的)和page cache(没有使用的)都有span,所以我们要在span类中添加一个属性,用来标志该span有没有在使用中。
合并我们要进行前后合并,无论是向前合并还是往后合并,都要注意的当合并时span在被使用的时候,不能继续进行合并;当合并的页的个数大于128时,也不能继续进行合并,因为这样在page cache中就无法进行映射了;最后还有就是找不到的时候也要跳出,当然我们要持续这个过程,使之满足以上三个条件。

  • 往前合并:页号减1进行合并(前面span的尾页号),修改新合并页的大小、起始页号,把合并的span 从链表中删除,释放旧的span
  • 往后合并:页号加页的个数进行合并(后面的span的首页号),修改新合并页的大小,把合并的span从链表中删除,释放旧的span

最后合并完成之后,再头插到对应的page cache桶中,同时进行把新的span和页号进行映射,还有把span设成没有被使用

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//往前合并
	while (true)
	{
		auto prev = _idSpanMap.find(span->_pageId - 1);
		if (prev == _idSpanMap.end())
			break;
		Span* prevSpan = prev->second;
		
		if (prevSpan->_useSpan)//被使用
			break;
		if (prevSpan->_pageNum + span->_pageNum > PAGE_NUM - 1)
			break;
		//进行合并
		span->_pageId = prevSpan->_pageId;
		span->_pageNum += prevSpan->_pageNum;
		_pageList[prevSpan->_pageNum].Erase(prevSpan);
		////测试一下
		//_idSpanMap.erase(prevSpan->_pageId);
		////
		delete prevSpan;
	}
	//往后合并
	while (true)
	{
		auto next = _idSpanMap.find(span->_pageId +span->_pageNum);
		if (next == _idSpanMap.end())
			break;
		Span* nextSpan = next->second;

		if (nextSpan->_useSpan)//被使用
			break;
		if (nextSpan->_pageNum + span->_pageNum > PAGE_NUM - 1)
			break;
		//进行合并
		span->_pageNum += nextSpan->_pageNum;
		_pageList[nextSpan->_pageNum].Erase(nextSpan);
		////测试一下
		//_idSpanMap.erase(nextSpan->_pageId);
		////
		delete nextSpan;
	}
	_pageList[span->_pageNum].PushFront(span);
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_pageNum - 1] = span;
	span->_useSpan = false;
}

系统细节完善

我们写的这个内存池要脱离malloc,直接对系统分配的内存进行管理,还有就是把释放内存的接口和free一样,不要让用户自动释放多少内存,只传入内存指针即可。

  • 释放接口的改善

要想只传内存的地址就直接释放内存,我们要对地址和内存的大小进行映射。地址可以通过页号计算出,内存的大小可以存储到span中,这样我们只需要在span中添加一个属性记录大小即可,然后通过页号和span的映射找到span,就找到了内存的大小了,就可以释放了。

  • 脱离malloc,new

用我们之前写的定长内存池进行替换我们内存池中的new

当申请大块内存的时候要怎么进行处理呢?
我们的内存池最大的页数是128页,每次申请的最大内存为256KB,所以我们的大块内存分为:

  1. 大于256KB,小于128*8KB。我们直接向page cache申请内存空间
  2. 大于128*8KB。直接向系统的堆区进行申请

上面两个我们都可以在page cache中进行判断。
因为直接通过page cache进行内存的申请,那么我们先要对内存进行对齐操作(小块内存,在thread cache中就进行对齐了),同时我们要修改一下原来的对齐函数
同时释放内存的部分也要修改一下,大于12*8KB的直接释放给系统。

最后我们还要优化一下,在外面的测试中,我们写的这个内存池还是比malloc不是太高效,主要是因为我们页号和span用的是unordered_map,这种结构在查找的时候需要进行锁操作 ,且查找不够快速。所以我们用基数树进行优化,直接参照tcmalloc的实现直接对我们的内存池进行优化。

Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SIZE;
	auto ret = (Span*)_idSpanMap.get(id);
	assert(ret);
	return ret;
}

在修改MapObjectToSpan的时候没有像之前那样有加锁操作。因为对于基数树来说,结构开辟好之后是不会改变的,而红黑树,在插入的时候会有旋转的情况,会导致结构不够稳定。基数树这种结构,多个线程访问读写的时候不会同时访问同一个位置,就不存在竞争资源的问题。

MIT License Copyright (c) 2023 maole Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

C++实现高并发内存池项目 展开 收起
README
MIT
取消

发行版

暂无发行版

贡献者

全部

语言

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/maoleblog/high-concurrency-memory-pool.git
git@gitee.com:maoleblog/high-concurrency-memory-pool.git
maoleblog
high-concurrency-memory-pool
高并发内存池
master

搜索帮助