代码拉取完成,页面将自动刷新
package producerconsumer
import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"gitee.com/pangxianfei/multiapp/config"
message "gitee.com/pangxianfei/multiapp/queue/protocol_buffers"
"gitee.com/pangxianfei/multiapp/facades"
"gitee.com/pangxianfei/multiapp/kernel/tmaic"
"gitee.com/pangxianfei/multiapp/kernel/zone"
)
type Consumer struct {
topicName string
channelName string
paramPtr proto.Message //获取参数
handler func(paramPtr proto.Message) error
}
func NewConsumer(topicName string, channelName string, paramPtr proto.Message, handler func(paramPtr proto.Message) error) *Consumer {
return &Consumer{
topicName: topicName,
channelName: channelName,
paramPtr: paramPtr,
handler: handler,
}
}
func (c *Consumer) Pop() error {
return Queue().Pop(c.topicName, c.channelName, func(hash string, body []byte) (handlerErr error) {
// exact message
msg := message.Message{}
if err := proto.Unmarshal(body, &msg); err != nil {
return err
}
// increase tried
msg.Tried = msg.Tried + 1
// log hash
msg.Hash = hash
facades.Log.Info("queue msg received", tmaic.V{
"msg": msg,
})
if err := proto.Unmarshal(msg.Param, c.paramPtr); err != nil {
return err
}
defer c.Failed(msg, &handlerErr)
if err := c.handler(c.paramPtr); err != nil {
facades.Log.Info(err.Error())
panic(err)
}
//如果处理程序panic或返回err,将不返回nil
return nil
}, config.GetInt("queue.max_in_flight"))
}
func (c *Consumer) Failed(msg message.Message, handlerErrPtr *error) {
if hErr := recover(); hErr != nil {
//fmt.Println(err)
newMsg := msg
newMsg.Retries = newMsg.Retries - 1
// delay the every retries more 3 minutes
newMsg.Delay = ptypes.DurationProto(zone.Duration(msg.Tried) * 3 * zone.Minute)
//fmt.Println(msg.Retries)
facades.Log.Error(errors.New("queue msg processed error"), tmaic.V{
"msg": msg,
"error": hErr,
})
if msg.Retries <= 0 {
if err := c.failedToDatabase(c.topicName, c.channelName, &msg, hErr); err != nil {
facades.Log.Error(errors.New("failedtodatabase processed failed"), tmaic.V{
"new_msg": newMsg,
})
newMsg.Retries = 1
goto DbFailed
}
return
}
DbFailed:
if err := c.failedToQueue(&newMsg, hErr, handlerErrPtr); err != nil {
if err := c.failedToDatabase(c.topicName, c.channelName, &newMsg, hErr); err != nil {
//错误 处理失败
facades.Log.Error(errors.New("failedtoqueue processed failed"), tmaic.V{
"new_msg": newMsg,
})
}
}
return
}
}
func (c *Consumer) failedToQueue(msg *message.Message, handlerErr interface{}, handlerErrPtr *error) error {
// 重新推送失败的消息并添加重试
*handlerErrPtr = nil
return push(c.topicName, c.channelName, msg)
}
func (c *Consumer) failedToDatabase(topicName string, channelName string, msg *message.Message, err interface{}) error {
return failedProcessor.FailedToDatabase(topicName, channelName, msg, convertInterfaceErr(err).Error())
}
func convertInterfaceErr(err interface{}) error {
if _err, ok := err.(error); ok {
return _err
}
errStr := fmt.Sprint(err)
return errors.New(errStr)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。