1 Star 0 Fork 0

xfx/iothub-south-client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
backoff_logic.go 5.10 KB
一键复制 编辑 原始数据 按行查看 历史
xfx 提交于 2023-04-26 14:40 +08:00 . feat(request): add call interface failure log.
package request
import (
"context"
errors2 "errors"
"math"
"net/http"
"sort"
"time"
"gitee.com/arjunxw/iothub-south-client/errors"
"gitee.com/arjunxw/iothub-south-client/service"
"gitee.com/arjunxw/iothub-south-client/util"
"go.uber.org/zap"
)
type byTime []*service.RetryRecord
func (s byTime) Len() int { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
if s[i].Next.IsZero() {
return false
}
if s[j].Next.IsZero() {
return true
}
return s[i].Next.Before(*s[j].Next)
}
func (req *request) run(records []*service.RetryRecord) {
retryDuration := time.Hour * 24
nextFunc := func(record *service.RetryRecord, prev time.Time) {
if prev.Sub(record.CreatedAt) < retryDuration {
record.RetryTimes += 1
nextDuration := math.Pow(2.0, float64(record.RetryTimes-1))
next := prev.Add(time.Duration(nextDuration) * time.Second)
record.Next = &next
} else {
record.Next = &time.Time{}
go func() {
req.rem <- record
req.reportService.DeleteRetryByReportId(context.Background(), record.ReportID)
}()
}
}
now := time.Now()
for _, record := range records {
nextFunc(record, now)
}
for {
sort.Sort(byTime(records))
var timer *time.Timer
if len(records) == 0 || records[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(records[0].Next.Sub(now))
}
for {
select {
case now = <-timer.C:
// Run every entry whose next time was less than now
for _, e := range records {
if e.Next.After(now) || e.Next.IsZero() {
break
}
e.Prev = e.Next
nextFunc(e, now)
go req.exec(e)
}
case newRecord := <-req.add:
timer.Stop()
now = time.Now()
nextFunc(newRecord, now)
if !newRecord.Next.IsZero() {
records = append(records, newRecord)
}
case record := <-req.rem:
timer.Stop()
now = time.Now()
records = remove(record, records)
}
break
}
}
}
func (req *request) exec(record *service.RetryRecord) {
ctx := context.Background()
history := req.reportService.FindHistory(ctx, record.ReportID)
dalInfo := req.dalInfoService.FindById(ctx, history.DalInfoId)
err := req.report(dalInfo.AccessToken, history.Url, history.Content)
now := time.Now()
if err != nil {
if errors2.Is(err, errors.ErrReqAccessTokenIsInvalid) {
authResult := req.Auth(ctx, dalInfo.DalID)
err = nil
if err = req.report(authResult.Body.AccessToken, history.Url, history.Content); err == nil {
goto SuccessLabel
}
}
history.LastSendTime = &now
history.RetryTimes = record.RetryTimes
history.Status = service.Failure
history.Reason = err.Error()
req.reportService.UpdateHistory(ctx, history)
req.reportService.UpdateRetry(ctx, record)
return
}
SuccessLabel:
history.LastSendTime = &now
history.RetryTimes = record.RetryTimes
history.Status = service.Success
req.reportService.UpdateHistory(ctx, history)
req.rem <- record
req.reportService.DeleteRetryByReportId(ctx, record.ReportID)
}
func remove(record *service.RetryRecord, records []*service.RetryRecord) []*service.RetryRecord {
if len(records) == 0 {
return records
}
var entries = make([]*service.RetryRecord, 0, len(records)-1)
for _, e := range records {
if e.ID != record.ID {
entries = append(entries, e)
}
}
return entries
}
func (req *request) handleResult(ctx context.Context, dalId string, history *service.ReportHistory, cause error) error {
now := time.Now()
if cause != nil {
history.LastSendTime = &now
history.Status = service.Failure
history.Reason = cause.Error()
err := req.reportService.UpdateHistory(ctx, history)
if err != nil {
return err
}
retry := false
switch cause.(type) {
case util.HttpStatusError:
retry = true
}
if errors2.Is(cause, errors.ErrReqAccessTokenIsInvalid) {
// At this time, if an invalid token is encountered, the token should be forced to update
req.Auth(ctx, dalId)
retry = true
}
if retry {
record := service.RetryRecord{
ReportID: history.ID,
RetryTimes: 0,
}
if err = req.reportService.CreateRetry(ctx, &record); err != nil {
req.add <- &record
}
return err
} else {
return cause
}
} else {
history.LastSendTime = &now
history.Status = service.Success
req.reportService.UpdateHistory(ctx, history)
}
return nil
}
func (req *request) report(accessToken, url, content string) (err error) {
respBody := ResponseBody{}
err = util.HttpPostJsonHeader(url, http.Header{"Authorization": {accessToken}}, content, &respBody)
if err != nil {
return
}
err = validateResponse(&respBody)
if err != nil {
req.log.Error("report", zap.Error(err), zap.Any("responseBody", &respBody))
}
return
}
func validateResponse(body *ResponseBody) error {
if body.Code == InvalidAccessToken {
return errors.ErrReqAccessTokenIsInvalid
}
if body.Code != Ok {
return errors.ErrReqNotOk
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/xfx0810/iothub-south-client.git
git@gitee.com:xfx0810/iothub-south-client.git
xfx0810
iothub-south-client
iothub-south-client
v0.7.7

搜索帮助