代码拉取完成,页面将自动刷新
package discovery
import (
"context"
"encoding/json"
"fmt"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"log"
"sync"
"time"
)
const (
// ServicePrefix 服务前缀
ServicePrefix = "service"
// ConfigPrefix 配置前缀
ConfigPrefix = "config"
// LogsPrefix 日志前缀
LogsPrefix = "logs"
)
// etcdClient 简化版etcd工具类
type etcdClient struct {
cli *clientv3.Client
leaseID clientv3.LeaseID
ctx context.Context
cancel context.CancelFunc
}
var (
once = &sync.Once{}
EtcdCli *etcdClient
)
// Init 初始化EtcdClient实例
func Init(endpoints []string, ttl int64) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Duration(ttl) * time.Second,
})
if err != nil {
log.Fatalf("初始化etcd错误 Err: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
EtcdCli = &etcdClient{
cli: cli,
ctx: ctx,
cancel: cancel,
}
}
func (ec *etcdClient) RegisterService(rMsg *ServiceMessage) {
leaseResp, err := ec.cli.Grant(ec.ctx, 30)
if err != nil {
log.Fatalf("向etcd注册服务, 创建租约错误 Err: %s", err)
}
ec.leaseID = leaseResp.ID
jsonVal, _ := json.Marshal(rMsg)
_, err = ec.cli.Put(ec.ctx, fmt.Sprintf("/%s/%s/%s", ServicePrefix, rMsg.ServiceName, rMsg.ServiceAddr), string(jsonVal), clientv3.WithLease(ec.leaseID))
if err != nil {
log.Fatalf("向etcd注册服务错误 Err: %s", err)
}
go ec.KeepAlive(rMsg.ServiceAddr)
}
// DiscoverServices 发现指定服务的所有实例
func (ec *etcdClient) DiscoverServices(serviceName string) ([]*ServiceMessage, error) {
resp, err := ec.cli.Get(ec.ctx, fmt.Sprintf("/%s/%s/", ServicePrefix, serviceName), clientv3.WithPrefix())
if err != nil {
return nil, err
}
var rMsgs []*ServiceMessage
for _, kv := range resp.Kvs {
rMsg := new(ServiceMessage)
err := json.Unmarshal(kv.Value, rMsg)
if err != nil {
return nil, err
}
rMsgs = append(rMsgs, rMsg)
}
return rMsgs, nil
}
// PutConfig 写入或更新配置
func (ec *etcdClient) PutConfig(key, value string) error {
_, err := ec.cli.Put(context.Background(), key, value)
if err != nil {
return fmt.Errorf("往etcd写入配置失败: %v", err)
}
return nil
}
// GetConfig 读取配置
func (ec *etcdClient) GetConfig(key string, opts ...clientv3.OpOption) ([]*mvccpb.KeyValue, error) {
resp, err := ec.cli.Get(context.Background(), key, opts...)
if err != nil {
return nil, fmt.Errorf("从etcd获取配置失败: %v", err)
}
return resp.Kvs, nil
}
// WatchConfig 监听指定key的配置变化
func (ec *etcdClient) WatchConfig(key string, onChange func(event mvccpb.Event_EventType, key, value string), opts ...clientv3.OpOption) error {
watchChan := ec.cli.Watch(context.Background(), key, opts...)
for watchResp := range watchChan {
for _, event := range watchResp.Events {
onChange(event.Type, string(event.Kv.Key), string(event.Kv.Value))
}
}
return nil // 实际上这里不会返回错误,因为range循环不会自然结束,需要外部处理上下文取消或错误
}
// Close 关闭etcd客户端连接和清理资源
func (ec *etcdClient) Close() {
ec.cancel()
if _, err := ec.cli.Revoke(context.TODO(), ec.leaseID); err != nil {
log.Printf("关闭etcd客户端连接撤销租约错误: %v", err)
}
if err := ec.cli.Close(); err != nil {
log.Printf("关闭etcd客户端连接错误: %v", err)
}
}
// KeepAlive 保持服务心跳
func (ec *etcdClient) KeepAlive(serviceAddr string) {
keepAliveChan, err := ec.cli.KeepAlive(ec.ctx, ec.leaseID)
if err != nil {
log.Fatalf("保持服务etcd心跳错误, Err: %v", err)
}
go func() {
for {
select {
case <-keepAliveChan:
// 这里简化处理,实际应用中可能需要更复杂的逻辑来处理租约失效
//if resp == nil {
// log.Println("Lease expired, attempting to re-register...")
// //ec.Close()
//}
case <-ec.ctx.Done():
return
}
}
}()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。