Ai
1 Star 1 Fork 0

orbit/meteor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
delayqueue.go 4.12 KB
一键复制 编辑 原始数据 按行查看 历史
orbit 提交于 2024-09-10 00:45 +08:00 . update: timingWheel
package delayqueue
import (
"container/heap"
"sync"
"sync/atomic"
"time"
)
// The start of PriorityQueue implementation.
// Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go
type item struct {
Value interface{}
Priority int64
Index int
}
// this is a priority queue as implemented by a min heap
// ie. the 0th element is the *lowest* value
type priorityQueue []*item
func newPriorityQueue(capacity int) priorityQueue {
return make(priorityQueue, 0, capacity)
}
func (pq priorityQueue) Len() int {
return len(pq)
}
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].Priority < pq[j].Priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
c := cap(*pq)
if n+1 > c {
npq := make(priorityQueue, n, c*2)
copy(npq, *pq)
*pq = npq
}
*pq = (*pq)[0 : n+1]
item := x.(*item)
item.Index = n
(*pq)[n] = item
}
func (pq *priorityQueue) Pop() interface{} {
n := len(*pq)
c := cap(*pq)
if n < (c/2) && c > 25 {
npq := make(priorityQueue, n, c/2)
copy(npq, *pq)
*pq = npq
}
item := (*pq)[n-1]
item.Index = -1
*pq = (*pq)[0 : n-1]
return item
}
func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {
if pq.Len() == 0 {
return nil, 0
}
item := (*pq)[0]
if item.Priority > max {
return nil, item.Priority - max
}
heap.Remove(pq, 0)
return item, 0
}
// The end of PriorityQueue implementation.
// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
// an element can only be taken when its delay has expired. The head of the
// queue is the *Delayed* element whose delay expired furthest in the past.
type DelayQueue struct {
C chan interface{}
mu sync.Mutex
pq priorityQueue
// Similar to the sleeping state of runtime.timers.
sleeping int32
wakeupC chan struct{}
}
// New creates an instance of delayQueue with the specified size.
func New(size int) *DelayQueue {
return &DelayQueue{
C: make(chan interface{}),
pq: newPriorityQueue(size),
wakeupC: make(chan struct{}),
}
}
// Offer inserts the element into the current queue.
func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
item := &item{Value: elem, Priority: expiration}
dq.mu.Lock()
heap.Push(&dq.pq, item)
index := item.Index
dq.mu.Unlock()
if index == 0 {
// A new item with the earliest expiration is added.
if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
dq.wakeupC <- struct{}{}
}
}
}
// Poll starts an infinite loop, in which it continually waits for an element
// to expire and then send the expired element to the channel C.
func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
for {
now := nowF()
dq.mu.Lock()
item, delta := dq.pq.PeekAndShift(now)
if item == nil {
// No items left or at least one item is pending.
// We must ensure the atomicity of the whole operation, which is
// composed of the above PeekAndShift and the following StoreInt32,
// to avoid possible race conditions between Offer and Poll.
atomic.StoreInt32(&dq.sleeping, 1)
}
dq.mu.Unlock()
if item == nil {
if delta == 0 {
// No items left.
select {
case <-dq.wakeupC:
// Wait until a new item is added.
continue
case <-exitC:
goto exit
}
} else if delta > 0 {
// At least one item is pending.
select {
case <-dq.wakeupC:
// A new item with an "earlier" expiration than the current "earliest" one is added.
continue
case <-time.After(time.Duration(delta) * time.Millisecond):
// The current "earliest" item expires.
// Reset the sleeping state since there's no need to receive from wakeupC.
if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
// A caller of Offer() is being blocked on sending to wakeupC,
// drain wakeupC to unblock the caller.
<-dq.wakeupC
}
continue
case <-exitC:
goto exit
}
}
}
select {
case dq.C <- item.Value:
// The expired element has been sent out successfully.
case <-exitC:
goto exit
}
}
exit:
// Reset the states
atomic.StoreInt32(&dq.sleeping, 0)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/orbit-w/meteor.git
git@gitee.com:orbit-w/meteor.git
orbit-w
meteor
meteor
8cff0bb952fe

搜索帮助