1 Star 0 Fork 0

庞飞/multiapp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
consumer.go 3.13 KB
一键复制 编辑 原始数据 按行查看 历史
庞飞 提交于 2023-10-03 17:10 . 合并主框架
package producerconsumer
import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"gitee.com/pangxianfei/multiapp/config"
message "gitee.com/pangxianfei/multiapp/queue/protocol_buffers"
"gitee.com/pangxianfei/multiapp/facades"
"gitee.com/pangxianfei/multiapp/kernel/tmaic"
"gitee.com/pangxianfei/multiapp/kernel/zone"
)
type Consumer struct {
topicName string
channelName string
paramPtr proto.Message //获取参数
handler func(paramPtr proto.Message) error
}
func NewConsumer(topicName string, channelName string, paramPtr proto.Message, handler func(paramPtr proto.Message) error) *Consumer {
return &Consumer{
topicName: topicName,
channelName: channelName,
paramPtr: paramPtr,
handler: handler,
}
}
func (c *Consumer) Pop() error {
return Queue().Pop(c.topicName, c.channelName, func(hash string, body []byte) (handlerErr error) {
// exact message
msg := message.Message{}
if err := proto.Unmarshal(body, &msg); err != nil {
return err
}
// increase tried
msg.Tried = msg.Tried + 1
// log hash
msg.Hash = hash
facades.Log.Info("queue msg received", tmaic.V{
"msg": msg,
})
if err := proto.Unmarshal(msg.Param, c.paramPtr); err != nil {
return err
}
defer c.Failed(msg, &handlerErr)
if err := c.handler(c.paramPtr); err != nil {
facades.Log.Info(err.Error())
panic(err)
}
//如果处理程序panic或返回err,将不返回nil
return nil
}, config.GetInt("queue.max_in_flight"))
}
func (c *Consumer) Failed(msg message.Message, handlerErrPtr *error) {
if hErr := recover(); hErr != nil {
//fmt.Println(err)
newMsg := msg
newMsg.Retries = newMsg.Retries - 1
// delay the every retries more 3 minutes
newMsg.Delay = ptypes.DurationProto(zone.Duration(msg.Tried) * 3 * zone.Minute)
//fmt.Println(msg.Retries)
facades.Log.Error(errors.New("queue msg processed error"), tmaic.V{
"msg": msg,
"error": hErr,
})
if msg.Retries <= 0 {
if err := c.failedToDatabase(c.topicName, c.channelName, &msg, hErr); err != nil {
facades.Log.Error(errors.New("failedtodatabase processed failed"), tmaic.V{
"new_msg": newMsg,
})
newMsg.Retries = 1
goto DbFailed
}
return
}
DbFailed:
if err := c.failedToQueue(&newMsg, hErr, handlerErrPtr); err != nil {
if err := c.failedToDatabase(c.topicName, c.channelName, &newMsg, hErr); err != nil {
//错误 处理失败
facades.Log.Error(errors.New("failedtoqueue processed failed"), tmaic.V{
"new_msg": newMsg,
})
}
}
return
}
}
func (c *Consumer) failedToQueue(msg *message.Message, handlerErr interface{}, handlerErrPtr *error) error {
// 重新推送失败的消息并添加重试
*handlerErrPtr = nil
return push(c.topicName, c.channelName, msg)
}
func (c *Consumer) failedToDatabase(topicName string, channelName string, msg *message.Message, err interface{}) error {
return failedProcessor.FailedToDatabase(topicName, channelName, msg, convertInterfaceErr(err).Error())
}
func convertInterfaceErr(err interface{}) error {
if _err, ok := err.(error); ok {
return _err
}
errStr := fmt.Sprint(err)
return errors.New(errStr)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pangxianfei/multiapp.git
git@gitee.com:pangxianfei/multiapp.git
pangxianfei
multiapp
multiapp
v1.1.9

搜索帮助