3 Star 0 Fork 0

Gitee 极速下载/gitlab-workhorsesource

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://gitlab.com/gitlab-org/gitlab-workhorse
克隆/下载
redis.go 7.38 KB
一键复制 编辑 原始数据 按行查看 历史
Ben Kochie 提交于 2018-10-29 15:07 . Update redigo library
package redis
import (
"errors"
"fmt"
"net"
"net/url"
"time"
"github.com/FZambia/sentinel"
"github.com/gomodule/redigo/redis"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
)
var (
pool *redis.Pool
sntnl *sentinel.Sentinel
)
const (
// Max Idle Connections in the pool.
defaultMaxIdle = 1
// Max Active Connections in the pool.
defaultMaxActive = 1
// Timeout for Read operations on the pool. 1 second is technically overkill,
// it's just for sanity.
defaultReadTimeout = 1 * time.Second
// Timeout for Write operations on the pool. 1 second is technically overkill,
// it's just for sanity.
defaultWriteTimeout = 1 * time.Second
// Timeout before killing Idle connections in the pool. 3 minutes seemed good.
// If you _actually_ hit this timeout often, you should consider turning of
// redis-support since it's not necessary at that point...
defaultIdleTimeout = 3 * time.Minute
// KeepAlivePeriod is to keep a TCP connection open for an extended period of
// time without being killed. This is used both in the pool, and in the
// worker-connection.
// See https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive for more
// information.
defaultKeepAlivePeriod = 5 * time.Minute
)
var (
totalConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_redis_total_connections",
Help: "How many connections gitlab-workhorse has opened in total. Can be used to track Redis connection rate for this process",
},
)
errorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_redis_errors",
Help: "Counts different types of Redis errors encountered by workhorse, by type and destination (redis, sentinel)",
},
[]string{"type", "dst"},
)
)
func init() {
prometheus.MustRegister(
totalConnections,
errorCounter,
)
}
func sentinelConn(master string, urls []config.TomlURL) *sentinel.Sentinel {
if len(urls) == 0 {
return nil
}
var addrs []string
for _, url := range urls {
h := url.URL.Host
log.WithFields(log.Fields{
"host": h,
}).Printf("redis: using sentinel")
addrs = append(addrs, h)
}
return &sentinel.Sentinel{
Addrs: addrs,
MasterName: master,
Dial: func(addr string) (redis.Conn, error) {
// This timeout is recommended for Sentinel-support according to the guidelines.
// https://redis.io/topics/sentinel-clients#redis-service-discovery-via-sentinel
// For every address it should try to connect to the Sentinel,
// using a short timeout (in the order of a few hundreds of milliseconds).
timeout := 500 * time.Millisecond
c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(timeout), redis.DialReadTimeout(timeout), redis.DialWriteTimeout(timeout))
if err != nil {
errorCounter.WithLabelValues("dial", "sentinel").Inc()
return nil, err
}
return c, nil
},
}
}
var poolDialFunc func() (redis.Conn, error)
var workerDialFunc func() (redis.Conn, error)
func timeoutDialOptions(cfg *config.RedisConfig) []redis.DialOption {
readTimeout := defaultReadTimeout
writeTimeout := defaultWriteTimeout
if cfg != nil {
if cfg.ReadTimeout != nil {
readTimeout = cfg.ReadTimeout.Duration
}
if cfg.WriteTimeout != nil {
writeTimeout = cfg.WriteTimeout.Duration
}
}
return []redis.DialOption{
redis.DialReadTimeout(readTimeout),
redis.DialWriteTimeout(writeTimeout),
}
}
func dialOptionsBuilder(cfg *config.RedisConfig, setTimeouts bool) []redis.DialOption {
var dopts []redis.DialOption
if setTimeouts {
dopts = timeoutDialOptions(cfg)
}
if cfg == nil {
return dopts
}
if cfg.Password != "" {
dopts = append(dopts, redis.DialPassword(cfg.Password))
}
if cfg.DB != nil {
dopts = append(dopts, redis.DialDatabase(*cfg.DB))
}
return dopts
}
func keepAliveDialer(timeout time.Duration) func(string, string) (net.Conn, error) {
return func(network, address string) (net.Conn, error) {
addr, err := net.ResolveTCPAddr(network, address)
if err != nil {
return nil, err
}
tc, err := net.DialTCP(network, nil, addr)
if err != nil {
return nil, err
}
if err := tc.SetKeepAlive(true); err != nil {
return nil, err
}
if err := tc.SetKeepAlivePeriod(timeout); err != nil {
return nil, err
}
return tc, nil
}
}
type redisDialerFunc func() (redis.Conn, error)
func sentinelDialer(dopts []redis.DialOption, keepAlivePeriod time.Duration) redisDialerFunc {
return func() (redis.Conn, error) {
address, err := sntnl.MasterAddr()
if err != nil {
errorCounter.WithLabelValues("master", "sentinel").Inc()
return nil, err
}
dopts = append(dopts, redis.DialNetDial(keepAliveDialer(keepAlivePeriod)))
return redisDial("tcp", address, dopts...)
}
}
func defaultDialer(dopts []redis.DialOption, keepAlivePeriod time.Duration, url url.URL) redisDialerFunc {
return func() (redis.Conn, error) {
if url.Scheme == "unix" {
return redisDial(url.Scheme, url.Path, dopts...)
}
dopts = append(dopts, redis.DialNetDial(keepAliveDialer(keepAlivePeriod)))
return redisDial(url.Scheme, url.Host, dopts...)
}
}
func redisDial(network, address string, options ...redis.DialOption) (redis.Conn, error) {
log.WithFields(log.Fields{
"network": network,
"address": address,
}).Printf("redis: dialing")
return redis.Dial(network, address, options...)
}
func countDialer(dialer redisDialerFunc) redisDialerFunc {
return func() (redis.Conn, error) {
c, err := dialer()
if err != nil {
errorCounter.WithLabelValues("dial", "redis").Inc()
} else {
totalConnections.Inc()
}
return c, err
}
}
// DefaultDialFunc should always used. Only exception is for unit-tests.
func DefaultDialFunc(cfg *config.RedisConfig, setReadTimeout bool) func() (redis.Conn, error) {
keepAlivePeriod := defaultKeepAlivePeriod
if cfg.KeepAlivePeriod != nil {
keepAlivePeriod = cfg.KeepAlivePeriod.Duration
}
dopts := dialOptionsBuilder(cfg, setReadTimeout)
if sntnl != nil {
return countDialer(sentinelDialer(dopts, keepAlivePeriod))
}
return countDialer(defaultDialer(dopts, keepAlivePeriod, cfg.URL.URL))
}
// Configure redis-connection
func Configure(cfg *config.RedisConfig, dialFunc func(*config.RedisConfig, bool) func() (redis.Conn, error)) {
if cfg == nil {
return
}
maxIdle := defaultMaxIdle
if cfg.MaxIdle != nil {
maxIdle = *cfg.MaxIdle
}
maxActive := defaultMaxActive
if cfg.MaxActive != nil {
maxActive = *cfg.MaxActive
}
sntnl = sentinelConn(cfg.SentinelMaster, cfg.Sentinel)
workerDialFunc = dialFunc(cfg, false)
poolDialFunc = dialFunc(cfg, true)
pool = &redis.Pool{
MaxIdle: maxIdle, // Keep at most X hot connections
MaxActive: maxActive, // Keep at most X live connections, 0 means unlimited
IdleTimeout: defaultIdleTimeout, // X time until an unused connection is closed
Dial: poolDialFunc,
Wait: true,
}
if sntnl != nil {
pool.TestOnBorrow = func(c redis.Conn, t time.Time) error {
if !sentinel.TestRole(c, "master") {
return errors.New("Role check failed")
}
return nil
}
}
}
// Get a connection for the Redis-pool
func Get() redis.Conn {
if pool != nil {
return pool.Get()
}
return nil
}
// GetString fetches the value of a key in Redis as a string
func GetString(key string) (string, error) {
conn := Get()
if conn == nil {
return "", fmt.Errorf("redis: could not get connection from pool")
}
defer conn.Close()
return redis.String(conn.Do("GET", key))
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/gitlab-workhorsesource.git
git@gitee.com:mirrors/gitlab-workhorsesource.git
mirrors
gitlab-workhorsesource
gitlab-workhorsesource
v7.1.0

搜索帮助