1 Star 0 Fork 0

coolboy199/concurrentMap

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
segment.go 3.52 KB
一键复制 编辑 原始数据 按行查看 历史
kanghonghong 提交于 2020-04-22 15:50 +08:00 . v1.01.01
package camp
import (
"sync"
"sync/atomic"
)
type BucketStatus uint8
const (
BUCKET_STATUS_UNDERWEIGHT BucketStatus = 1
BUCKET_STATUS_OVERWEIGHT BucketStatus = 2
)
type Segment interface {
Put(pair Pair)(bool,error)
Get(key string)Pair
Total()uint64
BucketNum()uint32
Delete(key string)bool
}
type segment struct {
buckets []Bucket
total uint64
threshold uint64
factor float64
bucketNum uint32
over uint32
empty uint32
bucketMax uint64
lock sync.RWMutex
}
func (this *segment)Delete(key string) bool {
this.lock.Lock()
defer this.lock.Unlock()
bucket := this.getWithHash(hash(key))
ok := bucket.Delete(key)
if ok{
atomic.AddUint64(&this.total,^uint64(0))
this.redistribute(bucket)
}
return ok
}
func (this *segment)BucketNum()uint32 {
return atomic.LoadUint32(&this.bucketNum)
}
func (this *segment)Put(pair Pair)(bool,error) {
this.lock.Lock()
defer this.lock.Unlock()
bucket := this.getWithHash(pair.Hash())
ok,err:=bucket.Put(pair)
if ok{
atomic.AddUint64(&this.total,1)
this.redistribute(bucket)
}
return ok,err
}
func (this *segment)Get(key string)Pair {
this.lock.RLock()
defer this.lock.RUnlock()
bucket :=this.getWithHash(hash(key))
return bucket.Get(key)
}
func(this *segment)Total()uint64{
return atomic.LoadUint64(&this.total)
}
func (this *segment)redistribute(bucket Bucket) {
bucketSize := bucket.Size()
this.updateThreshold(this.Total(),len(this.buckets))
status :=this.CheckBucketStatus(bucketSize)
this.Redistribe(status)
}
func (this *segment)getWithHash(hash uint64) Bucket {
index := hash%uint64(len(this.buckets))
return this.buckets[index]
}
func (this *segment) updateThreshold(total uint64,bucketNum int) {
avg := total/(uint64(bucketNum))
if avg < 100{
avg =100
}
atomic.StoreUint64(&this.threshold,uint64(float64(avg)*this.factor))
}
func (this *segment)CheckBucketStatus(bucketSize uint64) BucketStatus {
if bucketSize > this.bucketMax || bucketSize> atomic.LoadUint64(&this.threshold){
atomic.AddUint32(&this.over,1)
return BUCKET_STATUS_OVERWEIGHT
}
if bucketSize == 0{
atomic.AddUint32(&this.empty,1)
return BUCKET_STATUS_UNDERWEIGHT
}
return 0
}
func (this *segment)Redistribe(status BucketStatus) {
currentNum := len(this.buckets)
newNum := currentNum
switch status {
case BUCKET_STATUS_OVERWEIGHT:
if atomic.LoadUint32(&this.over)*4 < uint32(currentNum){
return
}
newNum = newNum << 1
case BUCKET_STATUS_UNDERWEIGHT:
if atomic.LoadUint32(&this.empty)*4 <uint32(currentNum) || currentNum<100{
return
}
newNum = newNum>>1
//fmt.Println(newNum)
if newNum < 2{
newNum = 2
}
default:
return
}
var pairs []Pair
for _,v:= range this.buckets{
var pairP Pair
for c := v.GetFirstPair();c != nil;c = c.Next(){
pairs = append(pairs,c)
if pairP != nil{
pairP.SetNext(nil)
}
pairP = c
}
}
if newNum > currentNum{
for i:= 0;i<currentNum;i++{
this.buckets[i].Clear()
}
for i:=0;i<newNum-currentNum;i++{
this.buckets = append(this.buckets,NewBucket())
}
}else{
this.buckets = make([]Bucket,newNum)
//fmt.Println(buckets)
for i:= 0;i<newNum;i++{
this.buckets[i] = NewBucket()
}
}
for _,p:=range pairs{
index := p.Hash()%uint64(newNum)
this.buckets[index].Put(p)
}
atomic.StoreUint32(&this.empty,0)
atomic.StoreUint32(&this.over,0)
atomic.StoreUint32(&this.bucketNum,uint32(newNum))
}
func NewSegment(factor float64,bucketMax uint64) Segment {
segment := &segment{
factor:factor,
bucketMax:bucketMax,
}
segment.buckets = append(segment.buckets,NewBucket())
segment.bucketNum =1
return segment
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/bubu/concurrentMap.git
git@gitee.com:bubu/concurrentMap.git
bubu
concurrentMap
concurrentMap
b5e5c3c5db66

搜索帮助