代码拉取完成,页面将自动刷新
package mq
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"github.com/gomodule/redigo/redis"
)
// Job 进入队列的消息结构
type Job struct {
ID string `redis:"id"`
Topic string `redis:"topic"`
Delay int `redis:"delay"`
TTR int `redis:"TTR"` // time-to-run
Body string `redis:"body"`
Status int `redis:"status"`
ConsumeNum int `redis:"consume_num"`
}
const (
// JobStatusDefault 默认状态
JobStatusDefault = iota
// JobStatusDelay delay:不可执行状态,等待时钟周期
JobStatusDelay
// JobStatusReady ready:可执行状态,等待消费
JobStatusReady
// JobStatusReserved reserved: 已被消费者读取,但还未得到消费者的响应(delete、finish)
JobStatusReserved
)
var (
// ErrJobIDEmpty job.id is empty
ErrJobIDEmpty = errors.New("job.id is empty")
// ErrJobTopicEmpty job.topic is empty
ErrJobTopicEmpty = errors.New("job.topic is empty")
)
// CheckJobData 检测job的数据结构
func (j *Job) CheckJobData() error {
if len(j.ID) == 0 {
return ErrJobIDEmpty
}
if len(j.Topic) == 0 {
return ErrJobTopicEmpty
}
return nil
}
func (j *Job) String() string {
s, _ := Encode(j)
return s
}
// JobCard JobCard
type JobCard struct {
id string
delay int
topic string
}
// Card Card
func (j *Job) Card() *JobCard {
return &JobCard{
id: j.ID,
delay: j.Delay,
topic: j.Topic,
}
}
// Key 获取job的id
func (j *Job) Key() string {
return GetJobKeyByID(j.ID)
}
// Encode job转成json字符串
func Encode(j *Job) (string, error) {
nbyte, err := json.Marshal(j)
if err != nil {
return "", err
}
return string(nbyte), nil
}
// Decode json字符串转成job对象
func Decode(j string) (*Job, error) {
job := &Job{}
err := json.Unmarshal([]byte(j), job)
if err != nil {
return nil, err
}
return job, nil
}
// Pop 根据topic消费队列
func Pop(topics ...string) (map[string]string, error) {
if len(topics) == 0 {
return nil, errors.New("topics is empty")
}
var ts []interface{}
for _, t := range topics {
ts = append(ts, GetJobQueueByTopic(t))
}
ts = append(ts, 2)
// 每次只会消费一个job,多个consumer消费时,redis会轮询分配给各个consumer
// consumer订阅多个topic时,会按照topic顺序读取,即先消费完第一个topic所有job,才会进行下一个topic
records, err := Redis.Strings("BRPOP", ts...)
if err != nil {
if err == redis.ErrNil {
return nil, errors.New("empty")
}
return nil, err
}
jobID := records[1]
if err := SetJobStatus(jobID, JobStatusReserved); err != nil {
return nil, err
}
detail, err := GetJobDetailByID(jobID)
if err != nil {
return nil, err
}
// TTR表示job执行超时时间(即消费者读取到job到确认删除这段时间)
// TTR>0时,若执行时间超过TTR,将重新添加到ready_queue,然后再次被消费
// TTR<=0时,消费者读取到job时,即会删除任务池中的job单元
TTR, err := strconv.Atoi(detail["TTR"])
if err != nil {
return nil, err
}
if TTR > 0 {
mq.dispatcher.addToTTRBucket <- &JobCard{
id: detail["id"],
delay: TTR + 3,
topic: detail["topic"],
}
// 计数被消费次数
IncrJobConsumeNum(jobID)
} else {
Ack(detail["id"])
}
return detail, nil
}
// Ack 根据id消费队列
func Ack(jobID string) (bool, error) {
job, err := GetJobStuctByID(jobID)
if err != nil {
return false, err
}
// TTR=0,被消费
if job.Status == JobStatusReserved {
return Redis.Bool("DEL", GetJobKeyByID(jobID))
}
// TTR>0,被消费后重新加到bucket,如果到期未确认删除会重新加到readyQueue再次被消费
if job.Status == JobStatusDelay && job.ConsumeNum > 0 {
return Redis.Bool("DEL", GetJobKeyByID(jobID))
}
// 可能正在被再次消费或者未被消费
if job.Status == JobStatusReady {
if job.ConsumeNum > 0 {
return false, errors.New("TTR has expired and will be consumed again")
}
return false, errors.New("Job is not be reserved")
}
return false, errors.New("Unknown error")
}
// Push json字符串类型消息 入队列
func Push(j string) error {
job, err := Decode(j)
if err != nil {
return err
}
return AddToJobPool(job)
}
// AddToJobPool 消息入队列
func AddToJobPool(j *Job) error {
isExist, err := Redis.Bool("EXISTS", j.Key())
if err != nil {
return err
}
if isExist {
return fmt.Errorf(fmt.Sprintf("jobKey:%v,error:has exist", j.Key()))
}
_, err = Redis.Do("HMSET", redis.Args{}.Add(j.Key()).AddFlat(j)...)
return err
}
// AddToReadyQueue 消息入队列
func AddToReadyQueue(jobID string) error {
conn := Redis.Pool.Get()
defer conn.Close()
key := GetJobKeyByID(jobID)
job, err := GetJobStuctByID(jobID)
if err != nil {
return err
}
if job.Status != JobStatusDelay && job.Delay > 0 {
return fmt.Errorf("job key%v,error:job.status is error", key)
}
queue := GetJobQueueByTopic(job.Topic)
script := `
local c = redis.call('llen', KEYS[1])
local r = redis.call('lpush', KEYS[1], ARGV[1])
if c + 1 == r then
redis.call('hset', KEYS[2], 'status', ARGV[2])
return 1
end
return 0
`
var ns = redis.NewScript(2, script)
_, err = redis.Bool(ns.Do(conn, queue, key, jobID, JobStatusReady))
return err
}
// GetTopicByJobID 获取消息的topic
func GetTopicByJobID(jobID string) (string, error) {
key := GetJobKeyByID(jobID)
return Redis.String("HGET", key, "topic")
}
// GetJobDetailByID 获取消息的detail
func GetJobDetailByID(jobID string) (map[string]string, error) {
key := GetJobKeyByID(jobID)
return Redis.StringMap("HGETALL", key)
}
// GetJobStuctByID 获取消息
func GetJobStuctByID(jobID string) (*Job, error) {
detail, err := GetJobDetailByID(jobID)
if err != nil {
return nil, err
}
delay, err := strconv.Atoi(detail["delay"])
if err != nil {
return nil, err
}
TTR, err := strconv.Atoi(detail["TTR"])
if err != nil {
return nil, err
}
status, err := strconv.Atoi(detail["status"])
if err != nil {
return nil, err
}
consumeNum, err := strconv.Atoi(detail["consume_num"])
if err != nil {
return nil, err
}
return &Job{
ID: detail["id"],
Topic: detail["topic"],
Delay: delay,
TTR: TTR,
Body: detail["body"],
Status: status,
ConsumeNum: consumeNum,
}, nil
}
// SetJobStatus 设置消息的状态
func SetJobStatus(jobID string, status int) error {
key := GetJobKeyByID(jobID)
_, err := Redis.Do("HSET", key, "status", status)
return err
}
// GetJobStatus 获取消息的状态
func GetJobStatus(jobID string) (int, error) {
key := GetJobKeyByID(jobID)
return Redis.Int("HGET", key, "status")
}
// IncrJobConsumeNum 消息被消费的次数+1
func IncrJobConsumeNum(jobID string) (bool, error) {
key := GetJobKeyByID(jobID)
return Redis.Bool("HINCRBY", key, "consume_num", 1)
}
// GetJobConsumeNum 获取消息被消费次数
func GetJobConsumeNum(jobID string) (int, error) {
key := GetJobKeyByID(jobID)
return Redis.Int("HGET", key, "consume_num")
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。