1 Star 0 Fork 0

fkil555/gin-extend

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
cache_connection.go 8.17 KB
一键复制 编辑 原始数据 按行查看 历史
fkil555 提交于 2023-09-10 21:31 . gin-extend init
package gredis
import (
"context"
"fmt"
"github.com/go-redis/redis"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"
)
var (
errReplacePortPattern = regexp.MustCompile(`(:\d{1,5}->)`)
errReplacePortData = ":localport->"
)
// 单个cache连接
type CACHEConn struct {
client *redis.Client
clusterClient *redis.ClusterClient
}
func newRedisClient(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf) (client *redis.Client) {
DSN := fmt.Sprintf("%s:%d", connConf.Host, connConf.Port)
client = redis.NewClient(&redis.Options{
Addr: DSN,
Password: groupConf.Password, //传递密码
DialTimeout: subGroupConf.DialTimeout * time.Millisecond, //闲置重新建立连接数时间 默认5s
ReadTimeout: subGroupConf.ReadTimeout * time.Millisecond, //设置读超时时间,默认3s
WriteTimeout: subGroupConf.WriteTimeout * time.Millisecond, //设置写超时时间,默认同slavetimeout
PoolSize: connConf.PoolSize,
MinIdleConns: connConf.MinIdleConns,
PoolTimeout: subGroupConf.PoolTimeout * time.Millisecond,
IdleTimeout: subGroupConf.IdleTimeout * time.Millisecond,
})
return
}
func newRedisClusterClient(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf) (client *redis.ClusterClient) {
DSNList := []string{}
for i := 0; i < len(groupConf.Cluster.Instances); i++ {
DSN := fmt.Sprintf("%s:%d", groupConf.Cluster.Instances[i].Host, groupConf.Cluster.Instances[i].Port)
DSNList = append(DSNList, DSN)
}
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: DSNList,
Password: groupConf.Password, //传递密码
DialTimeout: subGroupConf.DialTimeout * time.Millisecond, //闲置重新建立连接数时间 默认5s
ReadTimeout: subGroupConf.ReadTimeout * time.Millisecond, //设置读超时时间,默认3s
WriteTimeout: subGroupConf.WriteTimeout * time.Millisecond, //设置写超时时间,默认同slavetimeout
PoolSize: subGroupConf.PoolSize,
MinIdleConns: subGroupConf.MinIdleConns,
PoolTimeout: subGroupConf.PoolTimeout * time.Millisecond,
IdleTimeout: subGroupConf.IdleTimeout * time.Millisecond,
})
return
}
func newCACHEConnection(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf, hostCountMap map[string]int) (cacheConn *CACHEConn, err error) {
initOptions(groupConf, subGroupConf, connConf, hostCountMap)
var client *redis.Client
var clusterClient *redis.ClusterClient
if connConf != nil {
client = newRedisClient(groupConf, subGroupConf, connConf)
} else {
clusterClient = newRedisClusterClient(groupConf, subGroupConf, connConf)
}
cacheConn = &CACHEConn{
client: client,
clusterClient: clusterClient,
}
return
}
// 设置初始化值
func initOptions(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf, hostCountMap map[string]int) {
if subGroupConf.PoolSize <= 0 {
subGroupConf.PoolSize = DEFAULT_POOL_SIZE
}
if subGroupConf.MinIdleConns <= 0 {
subGroupConf.MinIdleConns = DEFAULT_MIN_IDLE_CONN
}
if subGroupConf.DefaultSmartMaxConn <= 0 {
subGroupConf.DefaultSmartMaxConn = DEFAULT_SMART_MAX_CONN
}
if subGroupConf.DefaultSmartMinIdleConn <= 0 {
subGroupConf.DefaultSmartMinIdleConn = DEFAULT_SMART_MIN_IDLE_CONN
}
if subGroupConf.DefaultPerCpuMaxConn <= 0 {
subGroupConf.DefaultPerCpuMaxConn = DEFAULT_PER_CPU_MAX_CONN
}
if subGroupConf.DefaultPerCpuMinIdleConn <= 0 {
subGroupConf.DefaultPerCpuMinIdleConn = DEFAULT_PER_CPU_MIN_IDLE_CONN
}
// redis-cluster模式,提前返回
if connConf == nil {
return
}
// 智能模式
if subGroupConf.ConnMode == 1 {
// 智能模式,默认每cpu最大连接数,最小闲置连接数
perCpuMaxConn := subGroupConf.DefaultPerCpuMaxConn
perCpuMinIdleConn := subGroupConf.DefaultPerCpuMinIdleConn
// cpu核数,存在小数情况, 需要取整, 从环境变量获取
cpuNumStr := os.Getenv("GE_CPU_PER_POD")
maxprocsFloat, _ := strconv.ParseFloat(cpuNumStr, 64)
maxprocsFloat = math.Ceil(maxprocsFloat)
maxprocs := int(maxprocsFloat)
if maxprocs <= 0 {
maxprocs = 1
}
// pod默认核数
podNumStr := os.Getenv("GE_POD_NUM")
podNum, _ := strconv.Atoi(podNumStr)
if podNum <= 0 {
podNum = 1
}
// 每个pod的cpu数 * pod总数 * 每个cpu分配的最大连接数 大于 总最大连接数, 则需要修改 每个cpu分配的最大连接数
if maxprocs*podNum*perCpuMaxConn > subGroupConf.DefaultSmartMaxConn {
perCpuMaxConn = subGroupConf.DefaultSmartMaxConn / podNum / maxprocs
if perCpuMaxConn == 0 {
perCpuMaxConn = 1
}
}
// 每个pod的cpu数 * pod总数 * 每个cpu分配的最小闲置连接数 大于 总闲置连接数, 则需要修改每个cpu分配的最小闲置连接数
if maxprocs*podNum*perCpuMinIdleConn > subGroupConf.DefaultSmartMinIdleConn {
perCpuMinIdleConn = subGroupConf.DefaultSmartMinIdleConn / podNum / maxprocs
if perCpuMinIdleConn == 0 {
perCpuMinIdleConn = 1
}
}
// toml中多个从库配置,实际指向同一个IP地址, 存在1个host被使用多次
ipUsageNum := hostCountMap[connConf.Host]
if ipUsageNum <= 0 {
ipUsageNum = 1
}
// 每个从库实例的最大连接数
maxConn := perCpuMaxConn * maxprocs / ipUsageNum
// 每个从库实例的 最小闲置连接数
idleConn := perCpuMinIdleConn * maxprocs / ipUsageNum
if maxConn == 0 {
maxConn = 1
}
if idleConn == 0 {
idleConn = 1
}
connConf.PoolSize = maxConn
connConf.MinIdleConns = idleConn
} else {
// 默认
connConf.PoolSize = subGroupConf.PoolSize
connConf.MinIdleConns = subGroupConf.MinIdleConns
}
}
func (cacheConn *CACHEConn) NewWrapperClient(ctx context.Context) (currentClient *redis.Client, err error) {
currentClient = cacheConn.client.WithContext(ctx)
currentClient.WrapProcess(cacheConn.InjectCtx(currentClient))
currentClient.WrapProcessPipeline(cacheConn.InjectPipelineCtx(currentClient))
return
}
func (cacheConn *CACHEConn) NewWrapperClusterClient(ctx context.Context) (currentClient *redis.ClusterClient, err error) {
currentClient = cacheConn.clusterClient.WithContext(ctx)
currentClient.WrapProcess(cacheConn.InjectClusterCtx(currentClient))
currentClient.WrapProcessPipeline(cacheConn.InjectClusterPipelineCtx(currentClient))
return
}
// 格式化redis命令用于cat展示
func formatCmds(cmders []redis.Cmder) string {
var cmdList = make([]string, 0)
// 对于每一个redis请求
for _, cmd := range cmders {
// 拼接命令参数
var args = make([]string, 0)
for index, arg := range cmd.Args() {
// 只上报排查问题必须的前两个参数
if index <= 1 {
args = append(args, fmt.Sprint(arg))
}
}
argsStr := strings.Join(args, " ")
if err := cmd.Err(); err != nil {
argsStr = fmt.Sprintf("%s(%s)", argsStr, err.Error())
}
cmdList = append(cmdList, argsStr)
}
return strings.Join(cmdList, "\n")
}
// 将ctx注入到go redis上下文环境
func (cacheConn *CACHEConn) InjectCtx(client *redis.Client) func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) (err error) {
err = old(cmd)
return err
}
}
}
// 将ctx注入go redis的pipeline语句中
func (cacheConn *CACHEConn) InjectPipelineCtx(client *redis.Client) func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
return func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
return func(cmders []redis.Cmder) (err error) {
err = oldProcess(cmders)
return err
}
}
}
func (cacheConn *CACHEConn) InjectClusterCtx(client *redis.ClusterClient) func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) (err error) {
err = old(cmd)
return err
}
}
}
// 将ctx注入go redis的pipeline语句中
func (cacheConn *CACHEConn) InjectClusterPipelineCtx(client *redis.ClusterClient) func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
return func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
return func(cmders []redis.Cmder) (err error) {
err = oldProcess(cmders)
return err
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/fkil555/gin-extend.git
git@gitee.com:fkil555/gin-extend.git
fkil555
gin-extend
gin-extend
v0.0.16

搜索帮助