Ai
1 Star 0 Fork 0

llakcs/agile-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
registry.go 7.17 KB
一键复制 编辑 原始数据 按行查看 历史
llakcs 提交于 2024-01-31 16:56 +08:00 . 第一次提交
package nacos
import (
"context"
"errors"
"fmt"
"gitee.com/llakcs/agile-go/log"
"gitee.com/llakcs/agile-go/registry"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"net"
"net/url"
"strconv"
)
type options struct {
kind string
weight float64
cluster string
group string
namespaceId string
timeoutMs uint64
notLoadCacheAtStart bool
logDir string
cacheDir string
logLevel string
username string
password string
accessKey string
secretKey string
serverConfs []ServerConfig
}
type ServerConfig struct {
IpAddr string
Port uint64
}
type Option func(o *options)
func WithDefaultKind(kind string) Option {
return func(o *options) { o.kind = kind }
}
func WithServerConfs(serverConfs []ServerConfig) Option {
return func(o *options) { o.serverConfs = serverConfs }
}
func WithSecretKey(secretKey string) Option {
return func(o *options) { o.secretKey = secretKey }
}
func WithAccessKey(accessKey string) Option {
return func(o *options) { o.accessKey = accessKey }
}
func WithPassWord(password string) Option {
return func(o *options) { o.password = password }
}
func WithUserName(username string) Option {
return func(o *options) { o.username = username }
}
func WithLogLevel(logLevel string) Option {
return func(o *options) { o.logLevel = logLevel }
}
func WithCacheDir(cacheDir string) Option {
return func(o *options) { o.cacheDir = cacheDir }
}
func WithLogDir(logDir string) Option {
return func(o *options) { o.logDir = logDir }
}
func WithNotLoadCacheAtStart(notLoadCacheAtStart bool) Option {
return func(o *options) { o.notLoadCacheAtStart = notLoadCacheAtStart }
}
func WithTimeOutMs(timeoutMs uint64) Option {
return func(o *options) { o.timeoutMs = timeoutMs }
}
func WithNameSpaceId(namespaceId string) Option {
return func(o *options) { o.namespaceId = namespaceId }
}
func WithWeight(weight float64) Option {
return func(o *options) { o.weight = weight }
}
func WithCluster(cluster string) Option {
return func(o *options) { o.cluster = cluster }
}
func WithGroup(group string) Option {
return func(o *options) { o.group = group }
}
type Registry struct {
opts options
namingClient naming_client.INamingClient
configClient config_client.IConfigClient
}
// 注册
func (r *Registry) Register(_ context.Context, service *registry.ServiceInstance) error {
for _, endpoint := range service.Endpoints {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return err
}
p, err := strconv.Atoi(port)
if err != nil {
return err
}
var rmd map[string]string
if service.Metadata == nil {
rmd = map[string]string{
"kind": u.Scheme,
"version": service.Version,
}
} else {
rmd = make(map[string]string, len(service.Metadata)+2)
for k, v := range service.Metadata {
rmd[k] = v
}
rmd["kind"] = u.Scheme
rmd["version"] = service.Version
}
_, e := r.namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: host,
Port: uint64(p),
ServiceName: service.Name,
Weight: r.opts.weight,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: rmd,
ClusterName: r.opts.cluster, // 默认值DEFAULT
GroupName: r.opts.group, // 默认值DEFAULT_GROUP
})
if e != nil {
return fmt.Errorf("RegisterInstance err %v,%v", e, endpoint)
}
}
return nil
}
// 注销
func (r *Registry) Deregister(_ context.Context, service *registry.ServiceInstance) error {
for _, endpoint := range service.Endpoints {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return err
}
p, err := strconv.Atoi(port)
if err != nil {
return err
}
if _, err = r.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: host,
Port: uint64(p),
ServiceName: service.Name + "." + u.Scheme,
GroupName: r.opts.group,
Cluster: r.opts.cluster,
Ephemeral: true,
}); err != nil {
return err
}
}
return nil
}
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
return newWatcher(ctx, r.namingClient, serviceName, r.opts.group, r.opts.kind, []string{r.opts.cluster})
}
func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
res, err := r.namingClient.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
GroupName: r.opts.group,
HealthyOnly: true,
})
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(res))
for _, in := range res {
kind := r.opts.kind
if k, ok := in.Metadata["kind"]; ok {
kind = k
}
items = append(items, &registry.ServiceInstance{
ID: in.InstanceId,
Name: in.ServiceName,
Version: in.Metadata["version"],
Metadata: in.Metadata,
Endpoints: []string{fmt.Sprintf("%s://%s:%d", kind, in.Ip, in.Port)},
})
}
return items, nil
}
func NewRegistry(opts ...Option) (*Registry, error) {
op := options{
cluster: "DEFAULT",
group: constant.DEFAULT_GROUP,
weight: 100,
kind: "grpc",
}
for _, option := range opts {
option(&op)
}
if err := validateOptions(op); err != nil {
log.Log(log.LevelError, "Invalid nacos options: %v", err)
return nil, err
}
clientConfig := *constant.NewClientConfig(
constant.WithUpdateCacheWhenEmpty(true),
constant.WithNamespaceId(op.namespaceId), //当namespace是public时,此处填空字符串。
constant.WithTimeoutMs(op.timeoutMs),
constant.WithNotLoadCacheAtStart(op.notLoadCacheAtStart),
constant.WithLogDir(op.logDir),
constant.WithCacheDir(op.cacheDir),
constant.WithLogLevel(op.logLevel),
constant.WithUsername(op.username),
constant.WithPassword(op.password),
constant.WithAccessKey(op.accessKey),
constant.WithSecretKey(op.secretKey),
)
serverConfigs := make([]constant.ServerConfig, 0)
for _, server := range op.serverConfs {
config := constant.NewServerConfig(server.IpAddr, server.Port)
serverConfigs = append(serverConfigs, *config)
}
// 创建服务发现客户端的另一种方式 (推荐)
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
},
)
if err != nil {
return nil, err
}
// 创建动态配置客户端的另一种方式 (推荐)
configClient, err := clients.NewConfigClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
},
)
if err != nil {
return nil, err
}
return &Registry{
opts: op,
namingClient: namingClient,
configClient: configClient,
}, nil
}
func validateOptions(opt options) error {
if opt.namespaceId == "" {
return errors.New("namespaceId is required")
}
if len(opt.serverConfs) == 0 {
return errors.New("serverConfs is required")
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/llakcs/agile-go.git
git@gitee.com:llakcs/agile-go.git
llakcs
agile-go
agile-go
v1.2.0

搜索帮助