1 Star 0 Fork 0

xlizy/common-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
nacos.go 12.96 KB
一键复制 编辑 原始数据 按行查看 历史
xlizy 提交于 2024-12-24 14:43 . save
package nacos
import (
"crypto/aes"
"encoding/base64"
"errors"
"gitee.com/xlizy/common-go/base/common_config"
"gitee.com/xlizy/common-go/base/common_goroutine"
"gitee.com/xlizy/common-go/utils/common_utils"
"gitee.com/xlizy/common-go/utils/json"
"gitee.com/xlizy/common-go/utils/zlog"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/robfig/cron/v3"
"gopkg.in/yaml.v3"
"math/rand/v2"
"net"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
var cronManageOnce = sync.Once{}
var cronManage *cron.Cron
var _namingClient naming_client.INamingClient
var _clientParam vo.NacosClientParam
var configVal map[string]string
var listenConfigs = make([]interface{}, 0)
func init() {
BaseWebConfigVal = &BaseWebConfig{}
BaseWebConfigVal.ServerConfig = WebServerConfig{}
cfg := common_config.GetNacosCfg()
nacosAddr := cfg.Addr
t := strings.Split(nacosAddr, ":")
host := t[0]
port, _ := strconv.Atoi(t[1])
serverConfigs := make([]constant.ServerConfig, 0)
serverConfigs = append(serverConfigs, constant.ServerConfig{
IpAddr: host,
Port: uint64(port),
})
clientConfig := *constant.NewClientConfig(
constant.WithNamespaceId(cfg.Namespace),
constant.WithTimeoutMs(5000),
constant.WithBeatInterval(3000),
constant.WithNotLoadCacheAtStart(true),
constant.WithLogDir("/tmp/nacos/log"),
constant.WithCacheDir("/tmp/nacos/cache"),
constant.WithLogLevel("error"),
constant.WithUsername(cfg.Username),
constant.WithPassword(cfg.Password),
constant.WithUpdateCacheWhenEmpty(true),
)
clientParam := vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
}
_clientParam = clientParam
}
func GetCronManage() *cron.Cron {
cronManageOnce.Do(func() {
cronManage = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger), cron.Recover(cron.DefaultLogger)))
cronManage.Start()
})
return cronManage
}
func GetNamingClient() naming_client.INamingClient {
return _namingClient
}
func BuildNamingClient() naming_client.INamingClient {
namingClient, err := clients.NewNamingClient(_clientParam)
if err != nil {
zlog.Error("连接Nacos异常:{}", err.Error())
panic(err)
} else {
return namingClient
}
}
func BuildConfigClient() config_client.IConfigClient {
configClient, err := clients.NewConfigClient(_clientParam)
if err != nil {
zlog.Error("连接Nacos异常:{}", err.Error())
panic(err)
} else {
return configClient
}
}
func InitNacos() {
namingClient := BuildNamingClient()
_namingClient = namingClient
loadRemoteConfig()
AddListen(common_config.AppEnv)
AddListen(common_config.AppSign)
AddListen(common_config.PriorityNetwork)
cfg := common_config.GetNacosCfg()
_ = common_goroutine.GetNonBlockingAntsPool().Submit(func() {
instanceRegister(cfg.AppName, cfg.AvailabilityCluster, cfg.Cluster)
})
}
func instanceRegister(appName, availabilityCluster, cluster string) {
for {
address := net.JoinHostPort(common_utils.GetLocalPriorityIp(common_config.PriorityNetwork.Networks), common_config.BootConfig.HttpPort)
// 3 秒超时
conn, err := net.DialTimeout("tcp", address, 3*time.Second)
if err != nil {
continue
} else {
if conn != nil {
_ = conn.Close()
break
} else {
continue
}
}
}
namingClient := GetNamingClient()
port, _ := strconv.Atoi(common_config.BootConfig.HttpPort)
_, _ = namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: common_utils.GetLocalPriorityIp(common_config.PriorityNetwork.Networks),
Port: uint64(port),
ServiceName: "http:" + appName,
Weight: 1,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"availability-cluster": availabilityCluster},
ClusterName: cluster,
GroupName: "DEFAULT_GROUP",
})
//定时查询服务信息(有助于nacos重启后恢复服务端注册信息)
_, _ = GetCronManage().AddFunc("0/30 * * * * *", func() {
_, _ = namingClient.GetService(vo.GetServiceParam{
ServiceName: "http:" + appName,
GroupName: "DEFAULT_GROUP",
})
})
}
func loadRemoteConfig() {
cfg := common_config.GetNacosCfg()
configClient := BuildConfigClient()
if cfg.DataIds != "" {
ids := strings.Split(cfg.DataIds, ",")
configVal = make(map[string]string, len(ids))
for _, id := range ids {
if id != "" {
conStr, err2 := configClient.GetConfig(vo.ConfigParam{
DataId: id,
Group: "DEFAULT_GROUP",
})
if err2 != nil {
zlog.Error("获取远程配置异常:{}", err2)
}
if conStr != "" {
if strings.Index(conStr, "config data not exist") != 0 {
conStr = desConfigContent(conStr)
configVal[id] = conStr
common_utils.ReadConfig(conStr, BaseWebConfigVal)
for _, config := range listenConfigs {
LoadConfig(config)
}
if BaseWebConfigVal.ServerConfig.Port != "" {
common_config.BootConfig.HttpPort = BaseWebConfigVal.ServerConfig.Port
}
}
}
_ = configClient.ListenConfig(vo.ConfigParam{
DataId: id,
Group: "DEFAULT_GROUP",
OnChange: func(namespace, group, dataId, data string) {
if data != "" {
if strings.Index(data, "config data not exist") != 0 {
data = desConfigContent(data)
configVal[dataId] = data
common_utils.ReadConfig(data, BaseWebConfigVal)
for _, config := range listenConfigs {
LoadConfig(config)
}
if BaseWebConfigVal.ServerConfig.Port != "" {
common_config.BootConfig.HttpPort = BaseWebConfigVal.ServerConfig.Port
}
}
}
},
})
}
}
}
}
func AddListen(configs ...interface{}) {
for _, config := range configs {
listenConfigs = append(listenConfigs, config)
LoadConfig(config)
}
}
func LoadConfig(out interface{}) {
finalMap := make(map[string]any)
for _, content := range configVal {
if content != "" && strings.Index(content, "config data not exist") != 0 {
temp := make(map[string]any)
_ = yaml.Unmarshal([]byte(content), &temp)
for k, _ := range temp {
finalMap[k] = temp[k]
}
}
}
mergedYaml, _ := yaml.Marshal(finalMap)
_ = yaml.Unmarshal(mergedYaml, common_utils.ClearNestedStruct(out))
}
func LoadConfigOld(out interface{}) {
for _, content := range configVal {
if content != "" && strings.Index(content, "config data not exist") != 0 {
_ = yaml.Unmarshal([]byte(content), common_utils.ClearNestedStruct(out))
}
}
}
// GracefulShutdown 优雅关闭
func GracefulShutdown() {
client := GetNamingClient()
if client != nil {
port, _ := strconv.Atoi(common_config.BootConfig.HttpPort)
_, _ = client.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: common_utils.GetLocalPriorityIp(common_config.PriorityNetwork.Networks),
Port: uint64(port),
ServiceName: "http:" + common_config.GetAppName(),
Ephemeral: true,
GroupName: "DEFAULT_GROUP",
})
client.CloseClient()
}
}
func GetAppIns(serviceName string) (string, error) {
ac := common_config.GetNacosCfg().AvailabilityCluster
clusters := make([]string, 0)
if ac != "" {
clusters = strings.Split(ac, ",")
}
nc := GetNamingClient()
if nc == nil {
zlog.Info("nacos namingClient is nil")
return "", errors.New("nacos namingClient is nil")
} else {
instances, err := nc.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
HealthyOnly: true,
})
if err != nil {
zlog.Info("无可用服务serviceName:{},err:{}", serviceName, err.Error())
// TODO 观察
//region 观察
zlog.Info("ServerHealthy:{}", nc.ServerHealthy())
sl, e1 := nc.GetAllServicesInfo(vo.GetAllServiceInfoParam{})
if e1 != nil {
zlog.Info("GetAllServicesInfo-error:{}", e1.Error())
} else {
zlog.Info("GetAllServicesInfo-result:{}", json.ToJsonStr(sl))
}
sa, e2 := nc.SelectAllInstances(vo.SelectAllInstancesParam{
ServiceName: serviceName,
})
if e2 != nil {
zlog.Info("SelectAllInstances-error:{}", e2.Error())
} else {
zlog.Info("SelectAllInstances-result:{}", json.ToJsonStr(sa))
}
//endregion
return "", err
} else {
finalInstances := make([]model.Instance, 0)
for _, instance := range instances {
if instance.Enable {
finalInstances = append(finalInstances, instance)
}
}
if len(finalInstances) == 0 {
zlog.Info("no available instance:{}")
return "", errors.New("no available instance")
} else if len(finalInstances) == 1 {
instance := finalInstances[0]
return instance.Ip + ":" + strconv.Itoa(int(instance.Port)), nil
} else {
tmpInstances := make([]model.Instance, 0)
for _, cluster := range clusters {
for _, instance := range finalInstances {
if instance.ClusterName == cluster {
tmpInstances = append(tmpInstances, instance)
}
}
if len(tmpInstances) > 0 {
break
}
}
if len(tmpInstances) == 0 {
for _, instance := range finalInstances {
tmpInstances = append(tmpInstances, instance)
}
}
if len(tmpInstances) == 1 {
instance := tmpInstances[0]
return instance.Ip + ":" + strconv.Itoa(int(instance.Port)), nil
}
type weight struct {
Min float64
Max float64
}
score := 0.00
his := 0.00
temp := make([]weight, len(tmpInstances))
for _, server := range tmpInstances {
score += server.Weight
}
for index, server := range tmpInstances {
temp[index] = weight{
Min: his,
Max: his + server.Weight/score*10000,
}
his = temp[index].Max
}
r := rand.IntN(10000)
for index, t := range temp {
if int(t.Min) <= r && r <= int(t.Max) {
instance := tmpInstances[index]
return instance.Ip + ":" + strconv.Itoa(int(instance.Port)), nil
}
}
instance := tmpInstances[0]
return instance.Ip + ":" + strconv.Itoa(int(instance.Port)), nil
}
}
}
}
func GetAllAppIns(serviceName string) ([]string, error) {
res := make([]string, 0)
nc := GetNamingClient()
if nc == nil {
zlog.Info("nacos namingClient is nil")
return res, errors.New("nacos namingClient is nil")
} else {
instances, err := nc.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
HealthyOnly: true,
})
if err != nil {
zlog.Info("无可用服务:{}", serviceName)
return res, err
} else {
finalInstances := make([]model.Instance, 0)
for _, instance := range instances {
if instance.Enable {
finalInstances = append(finalInstances, instance)
}
}
if len(finalInstances) == 0 {
zlog.Info("no available instance:{}", err.Error())
return res, errors.New("no available instance")
} else {
for _, instance := range finalInstances {
res = append(res, instance.Ip+":"+strconv.Itoa(int(instance.Port)))
}
return res, nil
}
}
}
}
func GetAppInsCallUrl(path string) (string, error) {
parsedUrl, pErr := url.Parse(path)
if pErr != nil {
zlog.Error("无法解析的URL:{}", path)
return "", pErr
} else {
host, iErr := GetAppIns("http:" + parsedUrl.Host)
if iErr != nil {
return "", iErr
} else {
path = strings.Replace(path, parsedUrl.Host, host, 1)
return path, nil
}
}
}
func GetAllAppInsCallUrl(path string) ([]string, error) {
res := make([]string, 0)
parsedUrl, pErr := url.Parse(path)
if pErr != nil {
zlog.Error("无法解析的URL:{}", path)
return res, pErr
} else {
host, iErr := GetAllAppIns("http:" + parsedUrl.Host)
if iErr != nil {
return res, iErr
} else {
for _, h := range host {
res = append(res, strings.Replace(path, parsedUrl.Host, h, 1))
}
return res, nil
}
}
}
func desConfigContent(content string) string {
//解密
content = replaceENC(content)
return content
}
func replaceENC(str string) string {
reg := regexp.MustCompile(`ENC\([^)]*\)`)
fragment := reg.FindString(str)
if fragment != "" {
des := strings.Replace(fragment, "ENC(", "", 1)
des = strings.Replace(des, ")", "", 1)
//解密
des = aesDecryptECB(des, common_config.GetConfigSecretKey())
str = strings.Replace(str, fragment, des, 1)
return replaceENC(str)
} else {
return str
}
}
func aesDecryptECB(encryptedStr, keyStr string) string {
defer func() {
if err := recover(); err != nil {
}
}()
encrypted, _ := base64.StdEncoding.DecodeString(encryptedStr)
key := []byte(keyStr)
cipher, _ := aes.NewCipher(generateKey(key))
decrypted := make([]byte, len(encrypted))
//
for bs, be := 0, cipher.BlockSize(); bs < len(encrypted); bs, be = bs+cipher.BlockSize(), be+cipher.BlockSize() {
cipher.Decrypt(decrypted[bs:be], encrypted[bs:be])
}
trim := 0
if len(decrypted) > 0 {
trim = len(decrypted) - int(decrypted[len(decrypted)-1])
}
return string(decrypted[:trim])
}
func generateKey(key []byte) (genKey []byte) {
genKey = make([]byte, 32)
copy(genKey, key)
for i := 32; i < len(key); {
for j := 0; j < 32 && i < len(key); j, i = j+1, i+1 {
genKey[j] ^= key[i]
}
}
return genKey
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/xlizy/common-go.git
git@gitee.com:xlizy/common-go.git
xlizy
common-go
common-go
v0.4.10

搜索帮助