3 Star 0 Fork 0

Gitee 极速下载/gitlab-workhorsesource

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://gitlab.com/gitlab-org/gitlab-workhorse
克隆/下载
queue.go 6.01 KB
一键复制 编辑 原始数据 按行查看 历史
package queueing
import (
"errors"
"time"
"github.com/prometheus/client_golang/prometheus"
)
type errTooManyRequests struct{ error }
type errQueueingTimedout struct{ error }
var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
type queueMetrics struct {
queueingLimit prometheus.Gauge
queueingQueueLimit prometheus.Gauge
queueingQueueTimeout prometheus.Gauge
queueingBusy prometheus.Gauge
queueingWaiting prometheus.Gauge
queueingWaitingTime prometheus.Histogram
queueingErrors *prometheus.CounterVec
}
// newQueueMetrics prepares Prometheus metrics for queueing mechanism
// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
// Don't call newQueueMetrics twice with the same name argument!
// timeout specifies the timeout of storing a request in queue - queueMetrics
// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
// metric
func newQueueMetrics(name string, timeout time.Duration) *queueMetrics {
waitingTimeBuckets := []float64{
timeout.Seconds() * 0.01,
timeout.Seconds() * 0.05,
timeout.Seconds() * 0.10,
timeout.Seconds() * 0.25,
timeout.Seconds() * 0.50,
timeout.Seconds() * 0.75,
timeout.Seconds() * 0.90,
timeout.Seconds() * 0.95,
timeout.Seconds() * 0.99,
timeout.Seconds(),
}
metrics := &queueMetrics{
queueingLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_limit",
Help: "Current limit set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}),
queueingQueueLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_limit",
Help: "Current queueLimit set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}),
queueingQueueTimeout: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_timeout",
Help: "Current queueTimeout set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}),
queueingBusy: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_busy",
Help: "How many queued requests are now processed",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}),
queueingWaiting: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_waiting",
Help: "How many requests are now queued",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}),
queueingWaitingTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gitlab_workhorse_queueing_waiting_time",
Help: "How many time a request spent in queue",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
Buckets: waitingTimeBuckets,
}),
queueingErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_queueing_errors",
Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
},
[]string{"type"},
),
}
prometheus.MustRegister(metrics.queueingLimit)
prometheus.MustRegister(metrics.queueingQueueLimit)
prometheus.MustRegister(metrics.queueingQueueTimeout)
prometheus.MustRegister(metrics.queueingBusy)
prometheus.MustRegister(metrics.queueingWaiting)
prometheus.MustRegister(metrics.queueingWaitingTime)
prometheus.MustRegister(metrics.queueingErrors)
return metrics
}
type Queue struct {
*queueMetrics
name string
busyCh chan struct{}
waitingCh chan time.Time
timeout time.Duration
}
// newQueue creates a new queue
// name specifies name used to label queue metrics.
// Don't call newQueue twice with the same name argument!
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit
func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
queue := &Queue{
name: name,
busyCh: make(chan struct{}, limit),
waitingCh: make(chan time.Time, limit+queueLimit),
timeout: timeout,
}
queue.queueMetrics = newQueueMetrics(name, timeout)
queue.queueingLimit.Set(float64(limit))
queue.queueingQueueLimit.Set(float64(queueLimit))
queue.queueingQueueTimeout.Set(timeout.Seconds())
return queue
}
// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire() (err error) {
// push item to a queue to claim your own slot (non-blocking)
select {
case s.waitingCh <- time.Now():
s.queueingWaiting.Inc()
break
default:
s.queueingErrors.WithLabelValues("too_many_requests").Inc()
return ErrTooManyRequests
}
defer func() {
if err != nil {
waitStarted := <-s.waitingCh
s.queueingWaiting.Dec()
s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
}
}()
// fast path: push item to current processed items (non-blocking)
select {
case s.busyCh <- struct{}{}:
s.queueingBusy.Inc()
return nil
default:
break
}
timer := time.NewTimer(s.timeout)
defer timer.Stop()
// push item to current processed items (blocking)
select {
case s.busyCh <- struct{}{}:
s.queueingBusy.Inc()
return nil
case <-timer.C:
s.queueingErrors.WithLabelValues("queueing_timedout").Inc()
return ErrQueueingTimedout
}
}
// Release marks the finish of processing of requests
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
// dequeue from queue to allow next request to be processed
waitStarted := <-s.waitingCh
s.queueingWaiting.Dec()
s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
<-s.busyCh
s.queueingBusy.Dec()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/gitlab-workhorsesource.git
git@gitee.com:mirrors/gitlab-workhorsesource.git
mirrors
gitlab-workhorsesource
gitlab-workhorsesource
v3.6.0

搜索帮助

0d507c66 1850385 C8b1a773 1850385