3 Star 0 Fork 0

Gitee 极速下载/gitlab-workhorsesource

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://gitlab.com/gitlab-org/gitlab-workhorse
克隆/下载
register.go 5.47 KB
一键复制 编辑 原始数据 按行查看 历史
package builds
import (
"encoding/json"
"errors"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
)
const (
maxRegisterBodySize = 32 * 1024
runnerBuildQueue = "runner:build_queue:"
runnerBuildQueueHeaderKey = "Gitlab-Ci-Builds-Polling"
runnerBuildQueueHeaderValue = "yes"
)
var (
registerHandlerRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_builds_register_handler_requests",
Help: "Describes how many requests in different states hit a register handler",
},
[]string{"status"},
)
registerHandlerOpen = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_builds_register_handler_open",
Help: "Describes how many requests is currently open in given state",
},
[]string{"state"},
)
registerHandlerOpenAtReading = registerHandlerOpen.WithLabelValues("reading")
registerHandlerOpenAtProxying = registerHandlerOpen.WithLabelValues("proxying")
registerHandlerOpenAtWatching = registerHandlerOpen.WithLabelValues("watching")
registerHandlerBodyReadErrors = registerHandlerRequests.WithLabelValues("body-read-error")
registerHandlerBodyParseErrors = registerHandlerRequests.WithLabelValues("body-parse-error")
registerHandlerMissingValues = registerHandlerRequests.WithLabelValues("missing-values")
registerHandlerWatchErrors = registerHandlerRequests.WithLabelValues("watch-error")
registerHandlerAlreadyChangedRequests = registerHandlerRequests.WithLabelValues("already-changed")
registerHandlerSeenChangeRequests = registerHandlerRequests.WithLabelValues("seen-change")
registerHandlerTimeoutRequests = registerHandlerRequests.WithLabelValues("timeout")
registerHandlerNoChangeRequests = registerHandlerRequests.WithLabelValues("no-change")
)
type largeBodyError struct{ error }
type watchError struct{ error }
type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error)
func init() {
prometheus.MustRegister(
registerHandlerRequests,
registerHandlerOpen,
)
}
type runnerRequest struct {
Token string `json:"token,omitempty"`
LastUpdate string `json:"last_update,omitempty"`
}
func readRunnerBody(w http.ResponseWriter, r *http.Request) ([]byte, error) {
registerHandlerOpenAtReading.Inc()
defer registerHandlerOpenAtReading.Dec()
return helper.ReadRequestBody(w, r, maxRegisterBodySize)
}
func readRunnerRequest(r *http.Request, body []byte) (*runnerRequest, error) {
if !helper.IsApplicationJson(r) {
return nil, errors.New("invalid content-type received")
}
var runnerRequest runnerRequest
err := json.Unmarshal(body, &runnerRequest)
if err != nil {
return nil, err
}
return &runnerRequest, nil
}
func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request) {
registerHandlerOpenAtProxying.Inc()
defer registerHandlerOpenAtProxying.Dec()
h.ServeHTTP(w, r)
}
func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) {
registerHandlerOpenAtWatching.Inc()
defer registerHandlerOpenAtWatching.Dec()
return watchHandler(runnerBuildQueue+token, lastUpdate, duration)
}
func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler {
if pollingDuration == 0 {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set(runnerBuildQueueHeaderKey, runnerBuildQueueHeaderValue)
requestBody, err := readRunnerBody(w, r)
if err != nil {
registerHandlerBodyReadErrors.Inc()
helper.RequestEntityTooLarge(w, r, &largeBodyError{err})
return
}
newRequest := helper.CloneRequestWithNewBody(r, requestBody)
runnerRequest, err := readRunnerRequest(r, requestBody)
if err != nil {
registerHandlerBodyParseErrors.Inc()
proxyRegisterRequest(h, w, newRequest)
return
}
if runnerRequest.Token == "" || runnerRequest.LastUpdate == "" {
registerHandlerMissingValues.Inc()
proxyRegisterRequest(h, w, newRequest)
return
}
result, err := watchForRunnerChange(watchHandler, runnerRequest.Token,
runnerRequest.LastUpdate, pollingDuration)
if err != nil {
registerHandlerWatchErrors.Inc()
proxyRegisterRequest(h, w, newRequest)
return
}
switch result {
// It means that we detected a change before starting watching on change,
// We proxy request to Rails, to see whether we have a build to receive
case redis.WatchKeyStatusAlreadyChanged:
registerHandlerAlreadyChangedRequests.Inc()
proxyRegisterRequest(h, w, newRequest)
// It means that we detected a change after watching.
// We could potentially proxy request to Rails, but...
// We can end-up with unreliable responses,
// as don't really know whether ResponseWriter is still in a sane state,
// for example the connection is dead
case redis.WatchKeyStatusSeenChange:
registerHandlerSeenChangeRequests.Inc()
w.WriteHeader(http.StatusNoContent)
// When we receive one of these statuses, it means that we detected no change,
// so we return to runner 204, which means nothing got changed,
// and there's no new builds to process
case redis.WatchKeyStatusTimeout:
registerHandlerTimeoutRequests.Inc()
w.WriteHeader(http.StatusNoContent)
case redis.WatchKeyStatusNoChange:
registerHandlerNoChangeRequests.Inc()
w.WriteHeader(http.StatusNoContent)
}
})
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/gitlab-workhorsesource.git
git@gitee.com:mirrors/gitlab-workhorsesource.git
mirrors
gitlab-workhorsesource
gitlab-workhorsesource
v5.2.0

搜索帮助