该项目是实现一个高并发的内存池
C/C++,数据结构(链表、哈希桶),操作系统内存管理,单例模式,多线程,互斥锁等。
内存池是指程序预先从操作系统中申请一大块内存,然后自己管理这部分内存。
当程序需要申请内存的时候,不是直接向操作系统申请,而是向内存池中申请内存;
释放的时候,也不是把内存给操作系统,而是放入内存池中。
我们先申请一个一定大小的内存空间,该空间用来承担小型池子的作用。
每次程序申请空间的时候,就从该小池子申请空间。
当程序释放资源空间的时候,就返回到“池子”中。
但是真的可以回到池子中吗?——不能
因为我们这个池子是连续的一段空间,每个程序来的时候,都向池子申请资源,但还资源的时候,我们无法判断,它是不是按照申请的顺序返回的,所以我们不能进入池子。
那我们进入哪里呢?我们可以用一个指针接收返回的空间,每个空间都有它的地址,我们把这些空间一个一个连接起来(用单链表的形式)。当再有其他程序来申请的空间的时候,直接从该链表资源中取该资源。
该定长内存池,我们给它起名为ObjectPool
——类
根据我们刚才的思路,至少需要2个成员。
_memory
——定长内存池指针_freeList
——释放资源的单链表指针还需要一个成员记录剩余空间的大小freeSpaceSize
该类要有申请空间和释放空间的能力,那么就需要这2个能力。
New()
——申请空间Delete()
——释放空间_memory
的是什么类型的指针呢?——char*
类型的
为什么是char类型的呢? 当申请内存空间的时候,就要算新空间的首地址,那么
char
类型的指针,加一就是加一个字节,非常方便计算。其他的类型就不是很方便。
_freeList
是类型的指针呢?——void*
,不需要知道该指针是什么类型的,设成void*就可以。
该_freeList
指向释放资源的地址,它们是怎么连接的呢?
每个释放的空间相当于一个结点,前4/8字节存储下一个结点的地址,那么最后一个空间存储的就是nullptr
。
char* _memory=nullptr;//定长内存空间的地址
void* _freeList=nullptr;//释放资源的链表指针
int _freeSpaceSize=0;//剩余空间的大小
New
方法
返回对象的地址即可——满足申请空间返回该空间的地址。 如果资源链表中有资源,直接从资源链表中拿资源即可——拿的方法为头删。
if (_freeList)
{
//头删
obj = (T*)_freeList;
_freeList = *((void**)_freeList);
}
上面有一个*((void**)_freeList)
,为什么要这样写呢?
因为我们也不知道平台是32位的平台,还是64位平台的。所以就先把_freeList类型转成void**
类型的。void**
类型的大小肯定是4或者8,解引用就是拿到该内存的前4,8个字节。当然用int**``char**
也是可以的,可不可以用void*
的呢?——不能,void*
类型不能解引用。
如果资源链表中没有资源,那么我们就要从池子里面去拿资源。 要拿池子中的资源,就有两者情况:
- 池子中有资源,直接拿
- 池子中没有资源,重新向内存中进行申请
判断条件为剩余空间小于要申请的大小
有资源之后就是要分配资源,分配资源的时候要注意:申请的资源是否大于地址的大小 为什么要注意这个呢?
- 因为该资源释放的时候,会进入释放资源链表,该资源要存储下一个资源的地址,如果空间小就存储不下该地址。
所以,当小于地址空间的时候,就要给它一个地址空间的大小。
把空间分配之后,要对剩余空间的地址和大小进行更新
因为模板可能是自定义类型,那么我们就要用定位new进行初始化资源。
T* New()
{
T* obj = nullptr;
//资源链表中有资源
if (_freeList)
{
//头删
obj = (T*)_freeList;
_freeList = *((void**)_freeList);//保证32,64平台都是可以的
}
else
{
//剩余空间不够,重新申请
if (_freeSpaceSize < sizeof T)
{
_freeSpaceSize = 128 * 1024;
_memory = (char*)malloc(_freeSpaceSize);
if (_memory==nullptr)
throw std::bad_alloc();
}
//分配资源
obj = (T*)_memory;
int objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//空间是否可以存储一个地址的大小
_memory += objSize;//更新空间地址
_freeSpaceSize -= objSize;//更新所剩空间的大小
}
//定位new,解决初始化问题
new(obj)T;
return obj;
}
Delete
方法
对申请资源进行释放,释放就是进入释放链表——头插(资源空间存储头结点的地址,
_freeList
指向新的资源地址) 释放前要调用对象的析构函数,释放空间不需要改变剩余空间的大小,因为申请空间时,优先从链表资源中取资源。
void Delete(T* obj)
{
//显式调用析构函数
obj->~T();
//头插
*((void**)obj) = _freeList;
_freeList = obj;
}
concurrent memory pool
(高并发内存池)主要有3个结构:
thread cache
:线程缓存是每个线程独有的,用于小于256KB的内存分配,线程从这里申请内存时是不需要加锁的,每个线程独享一个cache
。——如何独享?下面有介绍central cache
:中心缓存是所以线程共享的,当thread cache
中无内存可用的时候,thread cache
就要从central cache
中申请内存,同时central cache
也会在合适的时候从thread
擦车中回收内存,避免一个线程占太多的内存,导致其他线程无法使用。从上面的描述可以看出,central cache
是需要竞争的资源,所以从这里取内存对象的时候是需要进行加锁的。`page cache
:页缓存是在central cache1
缓存上面的缓存,存储的内容是以页我得单位存储及分配的,central cache
没有内存时,从page cache
中分配一段数量的page
,并切割定长大小的小块内存,然后分配给central cache
。在一定的时候,多个页的对象会page cache
进行合并回收,组成一个更大的页,来缓解内存碎片的问题。thread cache是一个哈希桶的结构,每个桶挂着一个自由链表。每个线程都有一个thread cache对象,这样每个线程从这里获取对象和释放对象时是无锁的。
怎么保证每个线程独享一个thread cache呢?——Thread Local Storage(线程局部存储)TLS
Windows上面这样设置就可以保证:
static __declspec(thread) ThreadCache* ptls_threadcache = nullptr;
这里的ptls_threadcache
就是指向thread cache
。
static void* concurrentalloc(size_t size)
{
if (ptls_threadcache == nullptr)
ptls_threadcache = new ThreadCache;
return ptls_threadcache->Allocate(size);
}
如同malloc
一样,它也需要知道需要多大的内存空间,同时要返回该内存空间的地址。
所以该函数为void* Allocte(size_t size)
。
如果内存的申请小于等于256KB,我们要先通过申请内存的大小判断是那个桶,找到这个桶之后,要判断该桶有没有空间,如果没有就去central cache中去申请,如果有直接头删返回就行。
细节——如何使内存对齐,如果找到桶:
- 内存对齐
内存对齐,我们用下面的方法:
[1,128] | 8byte对齐 | freelist[0,16) |
---|---|---|
[128+1,1024] | 16byte对齐 | freelist[16,72) |
[1024+1,8*1024] | 128byte对齐 | freelist[72,128) |
[81024+1,641024] | 1024byte对齐 | freelist[128,184) |
[641024+1,2561024] | 8*1024byte对齐 | freelist[184,208) |
为什么要向上面那样内存对齐呢?是为了保证总体的内存碎片率大概处于10%左右。
从上面也就确定了桶的个数208个桶。
对齐之后的内存是多少?——如果需要的内存空间刚好是对齐数的整数倍,那么就不需要进行处理;如果不是整数倍,只需向上调整到整数倍即可size - size % alignNum + alignNum;
。
当然可以用位运算来更高效的处理这个问题,因为对齐是频繁调用的,cpu对位操作处理的非常快,同时把函数搞成内联函数加快处理速度。
static inline size_t _alignSize(size_t size, size_t alignNum)
{
if (size % alignNum == 0)
{
return size;
}
else
return size - size % alignNum + alignNum;
}
static inline size_t alignSize(size_t size)
{
assert(size <= MAX_BYTE);
if (size <= 128)
{
return _alignSize(size, 8);
}
else if (size <= 1024)
{
return _alignSize(size, 16);
}
else if (size <= 8 * 1024)
{
return _alignSize(size, 128);
}
else if (size <= 64 * 1024)
{
return _alignSize(size, 1024);
}
else if (size <= 256 * 1024)
{
return _alignSize(size, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}
- 找桶
如何找是哪个桶呢?
因为每个区间的字节对齐数不一样,所以我们首先要去除前一个区间的最大字节。然后用剩下的字节去计算桶数再加上上一个区间的桶数即可。
static inline size_t _findBucket(size_t size,size_t align)
{
if (size % align == 0)
{
return size / align - 1;
}
return size / align;
}
static inline size_t findBucket(size_t size)
{
//每个区间的最大桶数
static size_t sectionBucketNum[] = { 16,72,128,184 };
if (size <= 128)
{
return _findBucket(size, 8);
}
else if (size <= 1024)
{
return _findBucket(size - 128, 16) + sectionBucketNum[0];
}
else if (size <= 8 * 1024)
{
return _findBucket(size - 1024, 128) + sectionBucketNum[1];
}
else if (size <= 64 * 1024)
{
return _findBucket(size - 8 * 1024, 1024) + sectionBucketNum[2];
}
else if (size <= 256 * 1024)
{
return _findBucket(size - 64 * 1024, 8 * 1024) + sectionBucketNum[3];
}
else
{
assert(false);
return -1;
}
}
桶中没有空间,向central cache申请空间——void* FetchFromCentralCache(size_t index, size_t size);
这里的index
是对于桶的下标,size
是已经对齐之后的空间的大小。
在向central cache申请内存时,就要考虑如何给它内存,因为我们知道它每次申请的都是一块空间(有小空间,如8字节,有大空间256K),那么我们如何分配呢?——慢开始算法
我们用最大的空间
MAX_BYTE
除以需求空间的大小,得到一个数字。 如果需求的空间小,那么这个数字很大;反之,这个数很小。 所以我们对得到的这个数进行处理:
- 小于2:说明请求的空间大,那么我们就给它2个
- 大于512:说请求的空间少,那么就很512
- 其他的情况,就按照所求的哪个数分配
static size_t SpaceNeedSize(size_t size)
{
assert(size > 0);
size_t num = MAX_BYTE / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
上面是分配多少个内存的问题,内部是怎么分配的我们教给central cache去处理。
(保证一定是申请成功的)有可能span不够我们申请的个数,那么申请的空间就会比我们申请的少。
如果就申请了一个(说明span中就只有1个,自由链表就一个对象,就不需要对自由链表后进行修复),就直接返回申请的地址。
如果不是1个,那么我们就要对自由链表进行修复处理——保持链表的正常连接。
void* ThreadCache:: FetchFromCentralCache(size_t index, size_t size)
{
//慢开始算法
size_t spacenum = min(_hashBucket[index].GetNeedNum(), SpaceNeedSize(size));
//申请相同空间时,慢慢增长
if (_hashBucket[index].GetNeedNum() == spacenum)
_hashBucket[index].GetNeedNum() += 2;
void* start = nullptr;
void* end = nullptr;
size_t actualnum = CentralCache::GetInstance()->FetchRangeObj(start, end, spacenum, size);
//断言,申请成功一定会满足下面的条件
assert(actualnum >= 1);
assert(start);
assert(end);
if (actualnum == 1)
{
assert(start == end);
return start;
}
else
{
//第一个参数是第二个内存,第一块对象是被申请走了,只需要把剩下的连接就可以
_hashBucket[index].PushRange(NextObj(start) , end);
return start;
}
}
保持链表的正常连接
void PushRange(void* start,void* end)
{
NextObj(end) = _freeList;
_freeList = start;
}
释放的时候要说明一下释放空间的地址和该空间的大小——参数要注意的
当释放内存小于256KB时,将内存释放回thread cache
对应的桶中(先找到对应的桶)——头插。
当链表过长的时候,则回收一部分内存对象到central cache
中。
如何把内存释放到central cache中呢?
我们可以设置一个成员变量记录每个桶中链表结点的个数,当结果个数大于每次向central cache
中申请的个数的时候,就释放申请个数个结点到central cache
中。记得要保证链表的连接哦。
所以,我们先在
_freeList
中添加一个成员_nodeNum
用来记录thread cache
桶中链表的长度。
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTE);
size_t index = AlignFindbucket::findBucket(size);
_hashBucket[index].push(ptr);
if (_hashBucket[index].NodeNum() >= _hashBucket[index].GetNeedNum())
{
ListTooLong(_hashBucket[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
//从链表中删除
list.PopRange(start, end, list.GetNeedNum());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
需要什么成员呢?当然是需要一个哈希桶
那么我们进行封装一下,怎么封装呢?
只需要把自由链表封装一下就可以了。
成员就是链表的头指针,
该结构要有头插、头删、判空的能力。
相当于回收资源 要传入资源地址——
void push(void* ptr)
相当于申请资源 要返回删除结点的地址——
void* pop()
就是判断链表是否为空
central cache
的结构也是一个哈希桶的结构,和thread cache
的映射关系是一样的。
不同的是central cache
的每个桶的对象是一个span
(多个页的一个跨度),每个span
相连成一个链表,每个span
又独自挂一个自由链表,该自由链表是对span
申请空间的一个分割。thread cache
是每个线程独有的,其他线程拿不到该资源,所以就需要进行上锁。
但是central cache
是进程享有的,每次进程有多个线程,那么我们就需要对central cache
进行加锁——因为是多个线程所看到的公共资源。
为了高效,我们不是对整个central cache
进行加锁,而是对每个桶进行加锁。
为什么要这样加锁呢?
结构、需求决定着我们要这样进行操作: 每个桶用于分配不同的内存空间,当各个线程需要的内存空间不在同一个桶的时候,就不需要竞争资源,就可以更高效。
既然每个进程有一个central cache
,那么就没有用单例模式,这里我们可以用饿汉模式。
该结构的成员就是哈希桶,每个桶存储的是一个带头双向链表结构。
其次还要有CentralCache
对象,用来实现单例模式,
带头双向循环链表的每个结点的元素就是一个span
——多个页的一个跨度
那么span
的有什么属性呢?
起始的页号:
_pagaId
。
- 关于这个起始页号,32位和64位下页的总数是不一样的。我们可以通过调节编译的方法来解决这个问题。
页的数量:_pageNum
还要具有双向链表的结构:
_next
_prev
挂自由链表的结构:_freeList
分配给thread cache的个数:
_useCount
span封装完成之后(相当于完成了元素结点),下面完成带头双向循环链表SpanList
对于带头的双向链表,最基本的功能就是满足插入和删除的工作。
SpanList是每个桶所管理的资源,根据上述描述,要进行加锁操作。
所以,有两个成员:
Span* _head;
//每个桶都需要加锁操作
std::mutex _lock;
初始化:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
插入:
//在当前结点之前插
void Insert(Span* pos,Span* newspan)
{
assert(pos);
assert(newspan);
Span* prev = pos->_prev;
newspan->_next = pos;
pos->_prev = newspan;
prev->_next = newspan;
newspan->_prev = prev;
}
删除:
//删除的当前结点
void Erase(Span* pos)
{
assert(pos);
//不能删除头结点
assert(_head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
因为是单例模式,所以构造函数要私有,拷贝构造和赋值构造要禁掉。
class CentralCache
{
public:
private:
//单例模式,构造私有,拷贝、赋值禁掉
CentralCache() {}
CentralCache(const CentralCache& obj) = delete;
CentralCache& operator=(CentralCache& obj) = delete;
SpanList _spanlist[BUCKET_NUM];
//单例模式——饿汉模式
static CentralCache _centralCache;
};
既然是单例模式,那么我们就需要获得这个实例
static CentralCache* GetInstance()
{
return &_centralCache;
}
size_t FetchRangeObj(void*& start, void*& end, size_t n, size_t byte_size);
对于申请内存一定要上锁。
我们通过需要空间大小,确定是哪个桶,然后去找span
,如果span
没有内存,就需要向怕个 cache
去申请空间;如果有内存,那么就可以给thread cache
,但是如果span
里面的内存个数小于申请的个数,那么span
有多少就给它多少。
给了之后,要重新把自由链表进行连接上,同时要记录分配给thread cache
多少空间。
上面的操作,我们就是直接遍历自由链表即可,同时要注意当链表为空时,就停止,记录申请的个即可。
注意:
申请完成资源之后,要保证申请走资源的最后一个资源指向的下一个资源为空。
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t byte_size)
{
size_t index = AlignFindbucket::findBucket(byte_size);
//加锁
_spanlist[index]._lock.lock();
Span* span = GetOneSpan(_spanlist[index], byte_size);
assert(span);
assert(span->_freeList);
size_t actualNum = 1;//实际申请到的个数
size_t i = 1;
//start就是申请到资源的地址
start = span->_freeList;
end = start;
//找到最后一个资源的地址
while (i<n&&NextObj(end)!=nullptr)
{
end = NextObj(end);
++i;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
//解锁
_spanlist[index]._lock.unlock();
return actualNum;
}
下面完成GetOneSpan
GetOneSpan
是用来获取到我们需要的一个span
空间,
用以下步骤获得:
central cache
中进行找,如果给有空间,就直接返回即可,如果没有,进入步骤2page cache
中去找步骤1:遍历这个双向循环链表即可,只要_freeList
不为空的,就是我们需要的。这里需要遍历我们可以在SpanList
中添加获取链表头指针和尾指针的方法。
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head->_prev;
}
步骤2:去pagecache
中去申请。首先我们是确定需要多少个这些内存,然后算出总的内存大小,最后通过总的内存大小计算出需要几页。
static size_t SpaceNeedSize(size_t size)
{
assert(size > 0);
size_t num = MAX_BYTE / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
//需要页的个数
static size_t PageNeedSize(size_t size)
{
size_t num = SpaceNeedSize(size);
size_t npage = num * size;
//页的个数
npage >>= PAGE_SIZE;
if (npage == 0)
npage = 1;
return npage;
}
从page cache
获得的span
之后,要进行切分——因为page cache
的span
是整块内存。
要切分就要知道改块内存的起始地址,还有总的自己个数,还有末尾地址。切的时候就是把每一块所需大小(size)的空间挂到_freeList上面。为了保证在物理上也是连续的地址,我们用尾插的方法。
分割代码:
//central cache桶中没有,向page cache中去申请
Span* newspan = PageCache::GetInstance()->NewSpan(PageNeedSize(byte_size));
//对从page cache获得的span进行切分
//char*的方便计算
char* start = (char*)(newspan->_pageId << PAGE_SIZE);//起始地址
size_t num = newspan->_pageNum << PAGE_SIZE;//有多少字节
char* end = start + num;//结尾地址
newspan->_freeList = start;
void* tail = start;
size_t i = 1;//分割的个数
while (start < end)
{
start += byte_size;
NextObj(tail) = start;
++i;
tail = NextObj(tail);
}
//分割完成,插入到central cache中
list.PushFront(newspan);
下面我们思考一下锁的逻辑:
当我们进去
GetOneSpan
的时候,是带桶锁进入的。如果是不同的进程进入同一个桶,是安全的;如果不是同一个桶,那么肯定申请空间就需要竞争,因为central cache是桶锁,那么我们就要加上page cache锁。 我们思考一下,在调用NewSpan
去page cache申请内存之前,要不要解锁呢?——需要。如果不解锁的话,在其他线程到该桶归还资源的时候,无法进行资源的归还,而且申请内存也是需要一段时间的。
让我们继续思考:
上面我们已经把桶锁解开了,那么在进行span分割的时候,需不需要进行加桶锁呢? 因为我们申请到的这块内存空间,只有该线程可见(就算其他线程进入page cache去申请,堆区的这个内存也已经被分配出去了),其他线程看不到。所以就不需要加桶锁。
最后一个思考:
分割完成之后,插入到对应桶的链表的时候,需要加锁吗? 需要的,因为这时,多个线程可以看到同一个桶,插入的时候,必须加锁。
整个的逻辑如下:
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
//先遍历Central cache
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
return it;
it = it->_next;
}
//开锁
list._lock.unlock();
//central cache桶中没有,向page cache中去申请,需要加锁
PageCache::GetInstance()->_pageMtx.lock();
Span* newspan = PageCache::GetInstance()->NewSpan(PageNeedSize(byte_size));
PageCache::GetInstance()->_pageMtx.unlock();
//对从page cache获得的span进行切分
//char*的方便计算
char* start = (char*)(newspan->_pageId << PAGE_SIZE);//起始地址
size_t num = newspan->_pageNum << PAGE_SIZE;//有多少字节
char* end = start + num;//结尾地址
newspan->_freeList = start;
void* tail = start;
size_t i = 1;//分割的个数
while (start < end)
{
start += byte_size;
NextObj(tail) = start;
++i;
tail = NextObj(tail);
}
//分割完成,插入到central cache中,加锁
list._lock.lock();
list.PushFront(newspan);
return newspan;
}
central cache
要对来自thread cache
中释放的内存连接到central cache
对应桶的span
上的_freeList
的链表上。当span
上挂的结点都回来的时候,我们再把span
的空间释放给page cache
。注意锁的问题
span
上面首先
thread cache
上面的结点的地址都是随机的,所有我们要变量返回的链表,一个结点一个结点的进行判断是哪个span
,找到span
之后,我们就把结点头插上去
如果span上面的结点都回来的时候,我们就要把该span释放到page cache上面
那么我们现在要结点的问题就是如何通过结点的地址找到span。
这个我们用unordered_map
,建立页号和span直接的对应关系。
这种映射关系应在分配内存的时候,就应该进行映射,所有之前的代码要修改一下。
//对分割出来的kspan的每一个页号都和kspan对应
for (int i = 0; i < kspan->_pageNum; i++)
{
_idSpanMap[kspan->_pageId + i] = kspan;
}
同时添加一个接口,可以通过id获得span。
Span* PageCache::MapObjectToSpan(void* obj)
{
assert(obj);
PAGE_ID page_id = (PAGE_ID)obj >> PAGE_SIZE;
auto ret = _idSpanMap.find(page_id);
if (ret != _idSpanMap.end())
return ret->second;
else
{
assert(false);
return nullptr;
}
}
在释放到page cache的时候(解开桶锁),先把span从central cache中删除,然后进入page cache的回收逻辑,同时把span中的部分信息清空。
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size)
{
assert(start);
size_t index = AlignFindbucket::findBucket(byte_size);
_spanlist[index]._lock.lock();
while (start)
{
void* next = NextObj(start);
//查找到对应的span
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
if (span->_useCount == 0)//给thread cache的内存都回来了,释放给page cache
{
_spanlist[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
_spanlist[index]._lock.unlock();
//回收到page cache
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanlist[index]._lock.lock();
}
start = next;
}
_spanlist[index]._lock.unlock();
}
page cache
的存储结构也是哈希桶的结构,不过,它是直接进行映射的。
结构如下:和central cache不一样的是,page cache的每个桶挂着的是带头的双向循环链表,对于span,都没有再挂自由链表,这里的span都是一个大的内存。
同样,pageCache也是单例的,因为每个central cache只有一个pagecache。
同时,pagecache也是需要上锁的——因为每个线程在centralcache的桶中没有找到资源的时候,就会向pagecache申请,就会导致竞争同一个资源,那么就要上锁。
哈希桶,每个桶挂着一个带头的双向循环链表span。
桶的个数为128个,我们用的是直接地址映射——1号桶挂1个页,2号桶挂2页……123号桶挂128页。那么对于我们需求的最大内存256KB来说,128号桶的一个span为1024KB,够分成4份了。(0号桶不算的话,就需要129个)
其次还需要一个锁。
class PageCache
{
public:
//获得实例
static PageCache* GetInstance()
{
return &_pageCache;
}
private:
PageCache(){}
PageCache(const PageCache& obj) = delete;
PageCache& operator=(PageCache& obj) = delete;
private:
SpanList _pageList[PAGE_NUM];
std::mutex _pageMtx;
//单例
static PageCache _pageCache;
};
当central cache
向page cache
申请空间的时候,page cache
会先在自己的空间上进行查找,
查找过程:
Span* PageCache::NewSpan(size_t k)
{
//申请的页数要正常
assert(k > 0 && k < PAGE_NUM);
//当前页有,直接分配
if (!_pageList[k].Empty())
return _pageList[k].PopFront();
//当前页没有,去找其他页
for (size_t i = k + 1; i < PAGE_NUM; i++)
{
if (!_pageList[i].Empty())
{
//切割这个页
Span* spans = _pageList[i].PopFront();
Span* kspan = new Span;
kspan->_pageId = spans->_pageId;
kspan->_pageNum = k;
spans->_pageId += k;
spans->_pageNum -= k;
_pageList[spans->_pageNum].PushFront(spans);
return kspan;
}
}
//当前页没有,向堆区进行申请
//一次申请很大的一个页
Span* newspan = new Span;
void* ptr= SystemAlloc(PAGE_NUM -1);
//更新页的属性
newspan->_pageId = (PAGE_ID)ptr >> PAGE_SIZE;
newspan->_pageNum = PAGE_NUM - 1;
_pageList[PAGE_NUM - 1].PushFront(newspan);
//回调自己
return NewSpan(k);
}
从central cache
中释放的span
空间到了page cache
中,那么我们就要对span
挂到对应的桶上面。
在挂之前,我们要先对span
进行合并(前后页的span
进行合并)。
在page cache
中,我们还是用unordered_map(PAGE_ID,span*)
,不过和central cache不同的是(需要切割span,切成一个一个的小内存,所有每个页号都需要进行映射),page cache不需要每个页每个页的进行映射,只需要首页好、尾页号和span进行映射即可,因为page cache中的大块内存不需要再分割成一个一个的小内存。——具体的映射关系阅读申请的逻辑代码。
//为了page cache合并大空间进行映射准备
_idSpanMap[spans->_pageId] = spans;
_idSpanMap[spans->_pageId + spans->_pageNum - 1] = spans;
因为central cache
(使用中的)和page cache
(没有使用的)都有span
,所以我们要在span
类中添加一个属性,用来标志该span
有没有在使用中。
合并我们要进行前后合并,无论是向前合并还是往后合并,都要注意的当合并时span
在被使用的时候,不能继续进行合并;当合并的页的个数大于128时,也不能继续进行合并,因为这样在page cache
中就无法进行映射了;最后还有就是找不到的时候也要跳出,当然我们要持续这个过程,使之满足以上三个条件。
span
的尾页号),修改新合并页的大小、起始页号,把合并的span
从链表中删除,释放旧的span
span
的首页号),修改新合并页的大小,把合并的span
从链表中删除,释放旧的span
最后合并完成之后,再头插到对应的page cache
桶中,同时进行把新的span和页号进行映射,还有把span
设成没有被使用
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//往前合并
while (true)
{
auto prev = _idSpanMap.find(span->_pageId - 1);
if (prev == _idSpanMap.end())
break;
Span* prevSpan = prev->second;
if (prevSpan->_useSpan)//被使用
break;
if (prevSpan->_pageNum + span->_pageNum > PAGE_NUM - 1)
break;
//进行合并
span->_pageId = prevSpan->_pageId;
span->_pageNum += prevSpan->_pageNum;
_pageList[prevSpan->_pageNum].Erase(prevSpan);
////测试一下
//_idSpanMap.erase(prevSpan->_pageId);
////
delete prevSpan;
}
//往后合并
while (true)
{
auto next = _idSpanMap.find(span->_pageId +span->_pageNum);
if (next == _idSpanMap.end())
break;
Span* nextSpan = next->second;
if (nextSpan->_useSpan)//被使用
break;
if (nextSpan->_pageNum + span->_pageNum > PAGE_NUM - 1)
break;
//进行合并
span->_pageNum += nextSpan->_pageNum;
_pageList[nextSpan->_pageNum].Erase(nextSpan);
////测试一下
//_idSpanMap.erase(nextSpan->_pageId);
////
delete nextSpan;
}
_pageList[span->_pageNum].PushFront(span);
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_pageNum - 1] = span;
span->_useSpan = false;
}
我们写的这个内存池要脱离malloc
,直接对系统分配的内存进行管理,还有就是把释放内存的接口和free一样,不要让用户自动释放多少内存,只传入内存指针即可。
要想只传内存的地址就直接释放内存,我们要对地址和内存的大小进行映射。地址可以通过页号计算出,内存的大小可以存储到span中,这样我们只需要在span中添加一个属性记录大小即可,然后通过页号和span的映射找到span,就找到了内存的大小了,就可以释放了。
malloc,new
用我们之前写的定长内存池进行替换我们内存池中的new
当申请大块内存的时候要怎么进行处理呢?
我们的内存池最大的页数是128页,每次申请的最大内存为256KB,所以我们的大块内存分为:
上面两个我们都可以在page cache中进行判断。
因为直接通过page cache进行内存的申请,那么我们先要对内存进行对齐操作(小块内存,在thread cache中就进行对齐了),同时我们要修改一下原来的对齐函数
同时释放内存的部分也要修改一下,大于12*8KB的直接释放给系统。
最后我们还要优化一下,在外面的测试中,我们写的这个内存池还是比malloc不是太高效,主要是因为我们页号和span用的是unordered_map
,这种结构在查找的时候需要进行锁操作 ,且查找不够快速。所以我们用基数树进行优化,直接参照tcmalloc的实现直接对我们的内存池进行优化。
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SIZE;
auto ret = (Span*)_idSpanMap.get(id);
assert(ret);
return ret;
}
在修改MapObjectToSpan
的时候没有像之前那样有加锁操作。因为对于基数树来说,结构开辟好之后是不会改变的,而红黑树,在插入的时候会有旋转的情况,会导致结构不够稳定。基数树这种结构,多个线程访问读写的时候不会同时访问同一个位置,就不存在竞争资源的问题。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。