1 Star 3 Fork 0

gm/mingo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
delay.go 1.88 KB
一键复制 编辑 原始数据 按行查看 历史
Leon 提交于 5个月前 . 0.7.0
package delay
import (
"context"
"errors"
"strconv"
"strings"
"time"
"github.com/redis/go-redis/v9"
)
type CallBack func(id string, queue string)
type Delay struct {
rdb *redis.Client
vhost string
cbs map[string][]CallBack
}
// 注册回调
func (t *Delay) On(queue string, cb CallBack) {
item := t.cbs[queue]
if item == nil {
item = make([]CallBack, 0)
}
t.cbs[queue] = append(item, cb)
}
// 设置超时时间(时间戳),单位毫秒
func (t *Delay) Set(queue string, id string, timestamp int) error {
if strings.Contains(queue, "/") {
return errors.New("queue cannot use /")
}
if strings.Contains(id, "/") {
return errors.New("id cannost use /")
}
ctx := context.Background()
_, err := t.rdb.ZAdd(ctx, t.vhost, redis.Z{Score: float64(timestamp), Member: getKey(queue, id)}).Result()
return err
}
func (t *Delay) SetTimeout(queue string, id string, duration time.Duration) error {
return t.Set(queue, id, int(time.Now().Add(duration).UnixMilli()))
}
func (t *Delay) Remove(queue string, id string) {
ctx := context.Background()
t.rdb.ZRem(ctx, t.vhost, getKey(queue, id))
}
// 轮询到期任务
func (t *Delay) Scan() ([]string, error) {
now := time.Now().UnixMilli()
ctx := context.Background()
values, err := t.rdb.ZRangeByScore(ctx, t.vhost, &redis.ZRangeBy{Min: "0", Max: strconv.Itoa(int(now))}).Result()
if err != nil {
return values, err
}
for _, value := range values {
tmp := strings.Split(value, "/")
if len(tmp) != 2 {
continue
}
queue := tmp[0]
id := tmp[1]
cbs := t.cbs[queue]
for _, cb := range cbs {
go cb(id, queue)
}
}
t.rdb.ZRem(ctx, t.vhost, values)
return values, nil
}
func getKey(queue string, id string) string {
return queue + "/" + id
}
func NewDelay(rdsOption *redis.Options, vhost string) Delay {
rdb := redis.NewClient(rdsOption)
res := Delay{rdb: rdb, vhost: vhost, cbs: make(map[string][]CallBack)}
return res
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/liangguoming/mingo.git
git@gitee.com:liangguoming/mingo.git
liangguoming
mingo
mingo
v0.9.3

搜索帮助

371d5123 14472233 46e8bd33 14472233