代码拉取完成,页面将自动刷新
package xnsq
import (
"encoding/json"
"errors"
"runtime"
"time"
"gitee.com/xiaoyutab/xgotool/individual/xlog"
"gitee.com/xiaoyutab/xgotool/xstring"
"github.com/nsqio/go-nsq"
)
type myMessageHandler struct{}
// 队列处理函数
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// 没有消息体时此处直接返回完成
return nil
}
// 重试次数限制(第6次请求直接返回nil,以免下次继续重试)
if m.Attempts > _default.MaxAttempts {
xlog.Info("NSQ失败次数过多", errors.New(string(m.Body)))
return nil
}
// 解析JSON字符串
router := NSQ{}
err := json.Unmarshal(m.Body, &router)
if err != nil {
// JSON解析错误
return err
}
if router.RunTime != "" {
// 判断当前时间差
h := time.Until(xstring.ToTime(router.RunTime))
// 再次延时
return SetDef(router.Func, router.Param, time.Second*time.Duration(h.Seconds()))
}
nsq_lock.RLock()
defer nsq_lock.RUnlock()
if tmps, ok := nsq_runtime[router.Func]; ok {
// 判断是否存在该任务配置
// 如果存在配置的话
if tmps.runZeroTime.IsZero() {
tmps.runZeroTime = time.Now()
}
// 如果存在指定时间范围的话
if tmps.TimeSlot > 0 && tmps.RunMax > 0 {
if time.Since(tmps.runZeroTime) > tmps.TimeSlot {
tmps.runNum = 0
tmps.runZeroTime = time.Now()
}
if tmps.runNum >= tmps.RunMax {
// 如果次数超出了的话,就直接销毁该任务
return nil
}
tmps.runNum++
}
tmps.PreviousTime = time.Now().Format(time.DateTime)
// 队列任务执行
defer func() {
if err := recover(); err != nil {
// 捕获到panic错误
_, file, line, _ := runtime.Caller(2)
xlog.SaveAny('P', file, uint(line), "NSQ失败请求", errors.New(router.Func+": "+err.(string)))
// fmt.Println("NSQ失败请求[panic报错]:", file, line, router.Func, err)
if tmps != nil {
// 失败次数自增
tmps.RunErrorNum++
tmps.PreviousError = err.(string)
nsq_runtime[router.Func] = tmps
}
}
}()
// 存在路由的话
param, _ := json.Marshal(router.Param)
err = tmps.Func(param)
if err != nil {
// 失败次数自增
tmps.RunErrorNum++
tmps.PreviousError = err.Error()
xlog.Error("NSQ失败请求", err)
} else {
// 成功次数自增
tmps.RunSuccessNum++
}
nsq_runtime[router.Func] = tmps
return err
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。