1 Star 1 Fork 0

Gousing/task

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
task_queue.go 5.61 KB
一键复制 编辑 原始数据 按行查看 历史
package task
import (
"sync"
"time"
)
// taskQueue 线程安全的最小堆任务队列数据结构, 参照 container/heap 定制实现任务添加/查询/消费
type taskQueue struct {
heap *taskHeap
maps map[string]*Task
mu sync.RWMutex
}
type taskHeap []*Task
func newTaskQueue() *taskQueue {
return &taskQueue{
heap: newTaskHeap(),
maps: make(map[string]*Task),
}
}
// LoadOrPush 依据任务 key 读取或添加任务
// - 已存在, 返回 旧任务 和 True
// - 不存在, 添加任务后返回 新任务 和 False
func (q *taskQueue) LoadOrPush(task *Task) (val *Task, loaded bool) {
if task == nil {
// 忽略 nil 任务
return nil, false
}
q.mu.Lock()
defer q.mu.Unlock()
val, loaded = q.maps[task.key]
if loaded {
return
}
q.heap.push(task)
q.heap.up(q.heap.len() - 1)
q.maps[task.key] = task
return task, false
}
// LoadAndRemove 依据任务 key 读取并删除任务
func (q *taskQueue) LoadAndRemove(key string) (*Task, bool) {
q.mu.Lock()
defer q.mu.Unlock()
task, ok := q.maps[key]
if ok {
delete(q.maps, key)
q.heap.remove(task.heapIndex)
return task, true
}
return nil, false
}
// Push 添加任务, 依据唯一任务Key, 已存在则先删除再添加
func (q *taskQueue) Push(task *Task) bool {
if task == nil {
return false
}
q.mu.Lock()
defer q.mu.Unlock()
if old, ok := q.maps[task.key]; ok {
// 已存在则先删除
q.heap.remove(old.heapIndex)
}
q.heap.push(task)
q.heap.up(q.heap.len() - 1)
q.maps[task.key] = task
return true
}
// FindWithIndex 依据 heap.Index 索引查找指定任务
func (q *taskQueue) FindWithIndex(index int) *Task {
q.mu.RLock()
defer q.mu.RUnlock()
return q.heap.findWithIndex(index)
}
// FindWithKey 依据任务 Key 查找指定任务
func (q *taskQueue) FindWithKey(key string) *Task {
q.mu.RLock()
defer q.mu.RUnlock()
task, ok := q.maps[key]
if !ok {
return nil
}
return task
}
// PopAndPeekNext 依据一个到期时间对象,尝试消费一个已到期的任务
// - 返回一个已到期的任务(任务执行时间小于等于条件时间), 不存在则返回 nil
// - 返回队列内的下一个任务(到期时间不限制, 供队列消费调度器设置定时任务), 不存在则返回 nil
func (q *taskQueue) PopAndPeekNext(expireAt time.Time) (popTask *Task, nextTask *Task) {
if q.IsEmpty() {
return nil, nil
}
q.mu.Lock()
defer q.mu.Unlock()
popTask = q.heap.peek()
// popTask.executeAt <= expireAt
if popTask != nil && !popTask.executeAt.After(expireAt) {
delete(q.maps, popTask.key)
q.heap.remove(popTask.heapIndex)
return popTask, q.heap.peek()
}
return nil, popTask
}
// Peek 读取堆顶最早到期的1个任务
func (q *taskQueue) Peek() *Task {
q.mu.RLock()
defer q.mu.RUnlock()
n := q.heap.len()
if n == 0 {
return nil
}
return q.heap.peek()
}
// Pop 弹出任务
func (q *taskQueue) Pop() *Task {
q.mu.Lock()
defer q.mu.Unlock()
n := q.heap.len() - 1
if n == -1 {
return nil
}
q.heap.swap(0, n)
q.heap.down(0, n)
task := q.heap.pop()
if task == nil {
return nil
}
delete(q.maps, task.key)
return task
}
// Remove 删除任务
func (q *taskQueue) Remove(key string) (*Task, bool) {
q.mu.Lock()
defer q.mu.Unlock()
task, ok := q.maps[key]
if ok {
delete(q.maps, key)
q.heap.remove(task.heapIndex)
return task, true
}
return nil, false
}
func (q *taskQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.maps = nil
q.maps = make(map[string]*Task)
q.heap = nil
q.heap = newTaskHeap()
}
func (q *taskQueue) Has(key string) bool {
q.mu.RLock()
defer q.mu.RUnlock()
_, ok := q.maps[key]
return ok
}
// Size 获取任务队列长度
func (q *taskQueue) Size() int {
q.mu.RLock()
defer q.mu.RUnlock()
return q.heap.len()
}
func (q *taskQueue) IsEmpty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.heap.len() == 0
}
func (q *taskQueue) Data() []*Task {
q.mu.RLock()
defer q.mu.RUnlock()
return q.heap.data()
}
func newTaskHeap() *taskHeap {
return new(taskHeap)
}
func (h taskHeap) len() int {
return len(h)
}
func (h taskHeap) less(i, j int) bool {
return h[i].executeAt.Before(h[j].executeAt)
}
func (h taskHeap) swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIndex, h[j].heapIndex = i, j
}
func (h taskHeap) up(j int) {
for {
i := (j - 1) / 2 // parent
if i == j || !h.less(j, i) {
break
}
h.swap(i, j)
j = i
}
}
func (h taskHeap) down(i0, n int) bool {
i := i0
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
j = j2 // = 2*i + 2 // right child
}
if !h.less(j, i) {
break
}
h.swap(i, j)
i = j
}
return i > i0
}
/*
// fix 更新元素属性时, 不需要重新调整堆结构
// - 未使用, 注释
func (h taskHeap) fix(i int) {
n := h.len()
if i < 0 || i > n-1 {
return
}
if !h.down(i, n) {
h.up(i)
}
}
*/
func (h *taskHeap) push(x *Task) {
*h = append(*h, x)
// 初始化 heapIndex
x.heapIndex = h.len() - 1
}
func (h *taskHeap) remove(i int) *Task {
n := h.len() - 1
if i < 0 || i > n {
return nil
}
if n != i {
h.swap(i, n)
if !h.down(i, n) {
h.up(i)
}
}
return h.pop()
}
func (h *taskHeap) pop() *Task {
old := *h
n := len(old)
if n == 0 {
return nil
}
x := old[n-1]
*h = old[0 : n-1]
old[n-1] = nil // 避免内存泄漏
return x
}
func (h *taskHeap) peek() *Task {
old := *h
n := len(old)
if n == 0 {
return nil
}
x := old[0]
return x
}
func (h *taskHeap) findWithIndex(index int) *Task {
old := *h
n := len(old)
if n == 0 || index < 0 || index > n-1 {
return nil
}
return old[index]
}
func (h *taskHeap) data() []*Task {
old := *h
return old
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/gousing/task.git
git@gitee.com:gousing/task.git
gousing
task
task
v1.0.0

搜索帮助