7 Star 53 Fork 26

ryanduan / wsPool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
priorityQueue.go 3.12 KB
一键复制 编辑 原始数据 按行查看 历史
ryanduan 提交于 2020-11-18 14:45 . 修复bug 和并发性能提升
// bill 2018.1.8
//优先级队列[同级别先进先出]权重值越大越优先
package queue
import (
"container/list"
"log"
"sync"
"time"
)
type Item struct {
Data interface{} //数据
Priority int32 //优先级
AddTime time.Time //插入队列的时间
Expiration int64 //过期时间值 以秒为单位
}
type PriorityQueue struct {
Data *list.List
PriorityMap map[int32]*pqmap
}
type pqmap struct {
element *list.Element
totle int
}
var lock sync.RWMutex
func NewPriorityQueue() *PriorityQueue {
pq:= &PriorityQueue{
Data: list.New(),
PriorityMap: make(map[int32]*pqmap),
}
return pq
}
func (pq *PriorityQueue) Len() int {
defer lock.RUnlock()
lock.RLock()
return pq.Data.Len()
}
func (pq *PriorityQueue) Push(v *Item) {
defer lock.Unlock()
lock.Lock()
newElement := pq.Data.PushFront(v)
if _, ok := pq.PriorityMap[v.Priority]; !ok {
pq.PriorityMap[v.Priority] = &pqmap{
element: newElement,
totle: 1,
}
} else {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle + 1
}
//找出小于自己的最大值权重值
var maxKey int32 = 1
for p, _ := range pq.PriorityMap {
if p < v.Priority && p >= maxKey {
maxKey = p
}
}
//pq.Dump()
if v.Priority != maxKey {
if _, ok := pq.PriorityMap[maxKey]; ok {
pq.Data.MoveAfter(newElement, pq.PriorityMap[maxKey].element)
}
}
//log.Println("挺入队列的消息:",v,"消息权重值:",v.Priority)
}
func (pq *PriorityQueue) Pop() *Item {
defer lock.Unlock()
lock.Lock()
iter := pq.Data.Back()
if iter==nil||iter.Value==nil{
return nil
}
v := iter.Value.(*Item)
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
//log.Println("取出队列的消息:",v,"消息权重值:",v.Priority)
return v
}
func (pq *PriorityQueue) Dump() {
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
log.Println("队列信息:", iter.Value.(*Item))
}
}
//清除队列
func (pq *PriorityQueue) Clear() {
defer lock.RUnlock()
lock.RLock()
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
//fmt.Println("item:", iter.Value.(*Item))
v := iter.Value.(*Item)
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
}
}
/*检测超时任务*/
func (pq *PriorityQueue) Expirations(expriCallback func(item *Item)) {
defer lock.RUnlock()
lock.RLock()
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
//fmt.Println("item:", iter.Value.(*Item))
v := iter.Value.(*Item)
if v.Expiration==0 {
continue
}
isExpri:=v.AddTime.Add(time.Duration(v.Expiration)*time.Second).Before(time.Now())
if isExpri {
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
expriCallback(v)
}else{
//没过期说明越往前越新
//break
}
}
}
Go
1
https://gitee.com/rczweb/wsPool.git
git@gitee.com:rczweb/wsPool.git
rczweb
wsPool
wsPool
v1.4.5

搜索帮助