1 Star 1 Fork 1

xiaoyutab / xgotool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
nsq_handle_message.go 2.36 KB
一键复制 编辑 原始数据 按行查看 历史
xiaoyutab 提交于 2024-04-30 10:16 . 调整目录结构和依赖位置
package xnsq
import (
"encoding/json"
"errors"
"runtime"
"time"
"gitee.com/xiaoyutab/xgotool/individual/xlog"
"gitee.com/xiaoyutab/xgotool/xstring"
"github.com/nsqio/go-nsq"
)
type myMessageHandler struct{}
// 队列处理函数
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// 没有消息体时此处直接返回完成
return nil
}
// 重试次数限制(第6次请求直接返回nil,以免下次继续重试)
if m.Attempts > _default.MaxAttempts {
xlog.Info("NSQ失败次数过多", errors.New(string(m.Body)))
return nil
}
// 解析JSON字符串
router := NSQ{}
err := json.Unmarshal(m.Body, &router)
if err != nil {
// JSON解析错误
return err
}
if router.RunTime != "" {
// 判断当前时间差
h := time.Until(xstring.ToTime(router.RunTime))
// 再次延时
return SetDef(router.Func, router.Param, time.Second*time.Duration(h.Seconds()))
}
nsq_lock.RLock()
defer nsq_lock.RUnlock()
if tmps, ok := nsq_runtime[router.Func]; ok {
// 判断是否存在该任务配置
// 如果存在配置的话
if tmps.runZeroTime.IsZero() {
tmps.runZeroTime = time.Now()
}
// 如果存在指定时间范围的话
if tmps.TimeSlot > 0 && tmps.RunMax > 0 {
if time.Since(tmps.runZeroTime) > tmps.TimeSlot {
tmps.runNum = 0
tmps.runZeroTime = time.Now()
}
if tmps.runNum >= tmps.RunMax {
// 如果次数超出了的话,就直接销毁该任务
return nil
}
tmps.runNum++
}
tmps.PreviousTime = time.Now().Format(time.DateTime)
// 队列任务执行
defer func() {
if err := recover(); err != nil {
// 捕获到panic错误
_, file, line, _ := runtime.Caller(2)
xlog.SaveAny('P', file, uint(line), "NSQ失败请求", errors.New(router.Func+": "+err.(string)))
// fmt.Println("NSQ失败请求[panic报错]:", file, line, router.Func, err)
if tmps != nil {
// 失败次数自增
tmps.RunErrorNum++
tmps.PreviousError = err.(string)
nsq_runtime[router.Func] = tmps
}
}
}()
// 存在路由的话
param, _ := json.Marshal(router.Param)
err = tmps.Func(param)
if err != nil {
// 失败次数自增
tmps.RunErrorNum++
tmps.PreviousError = err.Error()
xlog.Error("NSQ失败请求", err)
} else {
// 成功次数自增
tmps.RunSuccessNum++
}
nsq_runtime[router.Func] = tmps
return err
}
return nil
}
Go
1
https://gitee.com/xiaoyutab/xgotool.git
git@gitee.com:xiaoyutab/xgotool.git
xiaoyutab
xgotool
xgotool
v0.3.9

搜索帮助