40 Star 146 Fork 3

Gitee 极速下载/grafana

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/grafana/grafana
克隆/下载
engine.go 6.33 KB
一键复制 编辑 原始数据 按行查看 历史
package alerting
import (
"context"
"fmt"
"time"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
tlog "github.com/opentracing/opentracing-go/log"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/rendering"
"github.com/grafana/grafana/pkg/setting"
"golang.org/x/sync/errgroup"
)
type AlertingService struct {
RenderService rendering.Service `inject:""`
execQueue chan *Job
//clock clock.Clock
ticker *Ticker
scheduler Scheduler
evalHandler EvalHandler
ruleReader RuleReader
log log.Logger
resultHandler ResultHandler
}
func init() {
registry.RegisterService(&AlertingService{})
}
func NewEngine() *AlertingService {
e := &AlertingService{}
e.Init()
return e
}
func (e *AlertingService) IsDisabled() bool {
return !setting.AlertingEnabled || !setting.ExecuteAlerts
}
func (e *AlertingService) Init() error {
e.ticker = NewTicker(time.Now(), time.Second*0, clock.New())
e.execQueue = make(chan *Job, 1000)
e.scheduler = NewScheduler()
e.evalHandler = NewEvalHandler()
e.ruleReader = NewRuleReader()
e.log = log.New("alerting.engine")
e.resultHandler = NewResultHandler(e.RenderService)
return nil
}
func (e *AlertingService) Run(ctx context.Context) error {
alertGroup, ctx := errgroup.WithContext(ctx)
alertGroup.Go(func() error { return e.alertingTicker(ctx) })
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })
err := alertGroup.Wait()
return err
}
func (e *AlertingService) alertingTicker(grafanaCtx context.Context) error {
defer func() {
if err := recover(); err != nil {
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
}
}()
tickIndex := 0
for {
select {
case <-grafanaCtx.Done():
return grafanaCtx.Err()
case tick := <-e.ticker.C:
// TEMP SOLUTION update rules ever tenth tick
if tickIndex%10 == 0 {
e.scheduler.Update(e.ruleReader.Fetch())
}
e.scheduler.Tick(tick, e.execQueue)
tickIndex++
}
}
}
func (e *AlertingService) runJobDispatcher(grafanaCtx context.Context) error {
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx)
for {
select {
case <-grafanaCtx.Done():
return dispatcherGroup.Wait()
case job := <-e.execQueue:
dispatcherGroup.Go(func() error { return e.processJobWithRetry(alertCtx, job) })
}
}
}
var (
unfinishedWorkTimeout = time.Second * 5
// TODO: Make alertTimeout and alertMaxAttempts configurable in the config file.
alertTimeout = time.Second * 30
resultHandleTimeout = time.Second * 30
alertMaxAttempts = 3
)
func (e *AlertingService) processJobWithRetry(grafanaCtx context.Context, job *Job) error {
defer func() {
if err := recover(); err != nil {
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
}
}()
cancelChan := make(chan context.CancelFunc, alertMaxAttempts*2)
attemptChan := make(chan int, 1)
// Initialize with first attemptID=1
attemptChan <- 1
job.Running = true
for {
select {
case <-grafanaCtx.Done():
// In case grafana server context is cancel, let a chance to job processing
// to finish gracefully - by waiting a timeout duration - before forcing its end.
unfinishedWorkTimer := time.NewTimer(unfinishedWorkTimeout)
select {
case <-unfinishedWorkTimer.C:
return e.endJob(grafanaCtx.Err(), cancelChan, job)
case <-attemptChan:
return e.endJob(nil, cancelChan, job)
}
case attemptID, more := <-attemptChan:
if !more {
return e.endJob(nil, cancelChan, job)
}
go e.processJob(attemptID, attemptChan, cancelChan, job)
}
}
}
func (e *AlertingService) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error {
job.Running = false
close(cancelChan)
for cancelFn := range cancelChan {
cancelFn()
}
return err
}
func (e *AlertingService) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) {
defer func() {
if err := recover(); err != nil {
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
}
}()
alertCtx, cancelFn := context.WithTimeout(context.Background(), alertTimeout)
cancelChan <- cancelFn
span := opentracing.StartSpan("alert execution")
alertCtx = opentracing.ContextWithSpan(alertCtx, span)
evalContext := NewEvalContext(alertCtx, job.Rule)
evalContext.Ctx = alertCtx
go func() {
defer func() {
if err := recover(); err != nil {
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
ext.Error.Set(span, true)
span.LogFields(
tlog.Error(fmt.Errorf("%v", err)),
tlog.String("message", "failed to execute alert rule. panic was recovered."),
)
span.Finish()
close(attemptChan)
}
}()
e.evalHandler.Eval(evalContext)
span.SetTag("alertId", evalContext.Rule.Id)
span.SetTag("dashboardId", evalContext.Rule.DashboardId)
span.SetTag("firing", evalContext.Firing)
span.SetTag("nodatapoints", evalContext.NoDataFound)
span.SetTag("attemptID", attemptID)
if evalContext.Error != nil {
ext.Error.Set(span, true)
span.LogFields(
tlog.Error(evalContext.Error),
tlog.String("message", "alerting execution attempt failed"),
)
if attemptID < alertMaxAttempts {
span.Finish()
e.log.Debug("Job Execution attempt triggered retry", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
attemptChan <- (attemptID + 1)
return
}
}
// create new context with timeout for notifications
resultHandleCtx, resultHandleCancelFn := context.WithTimeout(context.Background(), resultHandleTimeout)
cancelChan <- resultHandleCancelFn
// override the context used for evaluation with a new context for notifications.
// This makes it possible for notifiers to execute when datasources
// dont respond within the timeout limit. We should rewrite this so notifications
// dont reuse the evalContext and get its own context.
evalContext.Ctx = resultHandleCtx
evalContext.Rule.State = evalContext.GetNewState()
e.resultHandler.Handle(evalContext)
span.Finish()
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
close(attemptChan)
}()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
JavaScript
1
https://gitee.com/mirrors/grafana.git
git@gitee.com:mirrors/grafana.git
mirrors
grafana
grafana
v6.0.0

搜索帮助

0d507c66 1850385 C8b1a773 1850385