Ai
1 Star 0 Fork 0

monobytes/gcore

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
builder.go 2.42 KB
一键复制 编辑 原始数据 按行查看 历史
null 提交于 2025-01-22 18:29 +08:00 . first commit
package direct
import (
"context"
"gitee.com/monobytes/gcore/gcluster"
"gitee.com/monobytes/gcore/gerrors"
"gitee.com/monobytes/gcore/glog"
"gitee.com/monobytes/gcore/gregistry"
"gitee.com/monobytes/gcore/gwrap/endpoint"
cli "github.com/smallnest/rpcx/client"
"net"
"net/url"
"sync"
"sync/atomic"
"time"
)
const scheme = "direct"
const defaultTimeout = 10 * time.Second
type Builder struct {
dis gregistry.Discovery
ctx context.Context
cancel context.CancelFunc
watcher gregistry.Watcher
state atomic.Bool
rw sync.RWMutex
addresses map[string]string
}
func NewBuilder(dis gregistry.Discovery) *Builder {
b := &Builder{}
b.dis = dis
b.ctx, b.cancel = context.WithCancel(context.Background())
b.addresses = make(map[string]string)
return b
}
func (b *Builder) Scheme() string {
return scheme
}
func (b *Builder) Build(target *url.URL) (cli.ServiceDiscovery, error) {
addr := target.Host
if _, _, err := net.SplitHostPort(target.Host); err != nil {
if err = b.init(); err != nil {
return nil, err
}
b.rw.RLock()
address, ok := b.addresses[target.Host]
b.rw.RUnlock()
if !ok {
return nil, gerrors.ErrNotFoundDirectAddress
}
addr = address
}
return cli.NewPeer2PeerDiscovery("tcp@"+addr, "")
}
func (b *Builder) init() error {
if b.dis == nil {
return gerrors.ErrMissDiscovery
}
if b.state.CompareAndSwap(false, true) == true {
return nil
}
ctx, cancel := context.WithTimeout(b.ctx, defaultTimeout)
instances, err := b.dis.Services(ctx, gcluster.Mesh.String())
cancel()
if err != nil {
return err
}
ctx, cancel = context.WithTimeout(b.ctx, defaultTimeout)
watcher, err := b.dis.Watch(ctx, gcluster.Mesh.String())
cancel()
if err != nil {
return err
}
b.watcher = watcher
b.updateInstances(instances)
go b.watch()
return nil
}
func (b *Builder) watch() {
for {
select {
case <-b.ctx.Done():
return
default:
// exec watch
}
instances, err := b.watcher.Next()
if err != nil {
continue
}
b.updateInstances(instances)
}
}
func (b *Builder) updateInstances(instances []*gregistry.ServiceInstance) {
addresses := make(map[string]string, len(instances))
for _, instance := range instances {
ep, err := endpoint.ParseEndpoint(instance.Endpoint)
if err != nil {
glog.Errorf("parse discovery endpoint failed: %v", err)
continue
}
addresses[instance.ID] = ep.Address()
}
b.rw.Lock()
b.addresses = addresses
b.rw.Unlock()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/monobytes/gcore.git
git@gitee.com:monobytes/gcore.git
monobytes
gcore
gcore
v0.0.17

搜索帮助