1 Star 0 Fork 0

天雨流芳 / go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
discovery.go 7.86 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-03-28 19:04 . grpc获取客户端连接的代码
package consul
import (
"context"
"fmt"
log "gitee.com/tylf2018/go-micro-framework/pkg/logger"
"gitee.com/tylf2018/go-micro-framework/registry"
regOps "gitee.com/tylf2018/go-micro-framework/registry/discovery"
"github.com/hashicorp/consul/api"
"strings"
"sync"
"sync/atomic"
"time"
)
type ConsulDiscovery struct {
cli *api.Client
registry map[string]*serviceSet
lock sync.RWMutex // 读写锁
}
func NewConsulDiscovery(options *regOps.DiscoveryOptions) *ConsulDiscovery {
config := api.DefaultConfig()
config.Address = options.Address
config.Scheme = options.Scheme
apiClient, err := api.NewClient(config)
if err != nil {
log.ErrorF("create api client error: %v", err)
panic(err)
}
return &ConsulDiscovery{
cli: apiClient,
registry: make(map[string]*serviceSet),
}
}
// GetService return service by name ,该方法主要用于第一次获取全量的服务对象,后期的增量标准是从watcher中获取
func (d *ConsulDiscovery) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
d.lock.RLock()
defer d.lock.RUnlock()
getRemote := func() []*registry.ServiceInstance {
services, _, err := d.Service(ctx, serviceName, 0, true)
if err == nil && len(services) > 0 {
return services
}
return nil
}
set := d.registry[serviceName]
// 如果本地没有获取到,则去远程调用获取
if set == nil {
if s := getRemote(); len(s) > 0 {
return s, nil
}
return nil, fmt.Errorf("service %s not resolved in registry", serviceName)
}
ss, _ := set.services.Load().([]*registry.ServiceInstance)
// 如果本地获取到的数据不是需要的返回类型,则从远程获取
if ss == nil {
if s := getRemote(); len(s) > 0 {
return s, nil
}
return nil, fmt.Errorf("service %s not found in registry", serviceName)
}
// 本地有需要的类型,则从本地获取
return ss, nil
}
// Watch 获取一个增量的监听器
func (d *ConsulDiscovery) Watch(_ context.Context, serviceName string) (registry.Watcher, error) {
// 锁此函数 防止 多个 协程 导致 数据不一致
d.lock.Lock()
defer d.lock.Unlock()
set, ok := d.registry[serviceName] // 如果 registry 中无数据 创建一个 set 第一次访问 为空
if !ok {
set = &serviceSet{
watcher: make(map[*watcher]struct{}), // 创建 观察者
services: &atomic.Value{}, // 原子性操作
serviceName: serviceName,
}
d.registry[serviceName] = set
}
// 初始化 watcher 观察者
w := &watcher{
event: make(chan struct{}, 1),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
w.set = set
set.lock.Lock()
set.watcher[w] = struct{}{} // 将 实例 w 写入到 set 中
set.lock.Unlock()
ss, _ := set.services.Load().([]*registry.ServiceInstance) // 原子性读 断言失败为 nil
if len(ss) > 0 {
// If the service has a value, it needs to be pushed to the watcher,
// otherwise the initial data may be blocked forever during the watch.
// 如果服务具有值,则需要将其推送到观察者,否则在观察期间初始数据可能会被永久阻塞
// 说明 set.services 里已经存在值了 并且有个 协程在 不停的运行
w.event <- struct{}{} // 从 channel 中写入值 如果 channel 有值 就会 hook 住
}
if !ok { // 第一次执行
err := d.resolve(set)
if err != nil {
return nil, err
}
}
return w, nil
}
func (d *ConsulDiscovery) resolve(ss *serviceSet) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
services, idx, err := d.Service(ctx, ss.serviceName, 0, true) // ss.serviceName 服务注册名称
cancel()
if err != nil {
return err
} else if len(services) > 0 { // 获取到服务了
ss.broadcast(services) // 原子性写操作 存入 consul 返回回来的相关信息 []*registry.ServiceInstance
}
go func() {
ticker := time.NewTicker(time.Second) // 执行定时任务
defer ticker.Stop()
for {
<-ticker.C
ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) // 长轮询的思想
tmpService, tmpIdx, err := d.Service(ctx, ss.serviceName, idx, true)
cancel()
if err != nil {
time.Sleep(time.Second)
continue
}
if len(tmpService) != 0 && tmpIdx != idx { // 获取到修改之后的服务了
services = tmpService // 健康的服务列表
ss.broadcast(services) // 进行原子性的存储
}
idx = tmpIdx
}
}()
return nil
}
// ListServices return service list.
// ListServices 返回 所有服务的 列表
func (d *ConsulDiscovery) ListServices() (allServices map[string][]*registry.ServiceInstance, err error) {
d.lock.RLock()
defer d.lock.RUnlock()
allServices = make(map[string][]*registry.ServiceInstance)
for name, set := range d.registry {
var services []*registry.ServiceInstance
ss, _ := set.services.Load().([]*registry.ServiceInstance)
if ss == nil {
continue
}
services = append(services, ss...)
allServices[name] = services
}
return
}
// Service get services from consul
func (d *ConsulDiscovery) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) {
opts := &api.QueryOptions{
// 在 consul 代表对应 API 调用时 Consul 中的某个资源的修改版本号。它用于实现 Consul 的强一致性,确保多个客户端对同一个资源的修改是有序的,并且避免并发修改引起的数据不一致问题
// 例如: 原本 某个资源的修改版本号为 1
// 当 此资源产生变化了 版本号修改为 2
WaitIndex: index,
WaitTime: time.Second * 55,
}
opts = opts.WithContext(ctx)
entries, meta, err := d.cli.Health().Service(service, "", passingOnly, opts) // 列出服务的服务实例
if err != nil {
return nil, 0, err
}
// 返回 []*registry.ServiceInstance, meta.LastIndex, nil
return defaultResolver(ctx, entries), meta.LastIndex, nil
}
// 参考: https://developer.hashicorp.com/consul/api-docs/health#sample-request-2
func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance {
services := make([]*registry.ServiceInstance, 0, len(entries)) // 创建 一个列表 存储相关信息
for _, entry := range entries {
var version string
for _, tag := range entry.Service.Tags { // 例如: "Tags": ["version=2.1.1"],
ss := strings.SplitN(tag, "=", 2)
if len(ss) == 2 && ss[0] == "version" {
version = ss[1]
}
}
endpoints := make([]string, 0)
for scheme, addr := range entry.Service.TaggedAddresses {
if scheme == "lan_ipv4" || scheme == "wan_ipv4" || scheme == "lan_ipv6" || scheme == "wan_ipv6" { // 如果是这几个标签 跳过
continue
}
endpoints = append(endpoints, addr.Address)
}
if len(endpoints) == 0 && entry.Service.Address != "" && entry.Service.Port != 0 { // 如果 entry.Service.TaggedAddresses 查询不到相关 ip 和 端口 从这里添加
endpoints = append(endpoints, fmt.Sprintf("http://%s:%d", entry.Service.Address, entry.Service.Port))
}
services = append(services, &registry.ServiceInstance{
ID: entry.Service.ID, // 服务 id
Name: entry.Service.Service, // 服务名称
Metadata: entry.Service.Meta, // 携带给 consul 的 额外信息 (可用可不用)
Version: version, // 服务版本
Endpoints: endpoints, // 服务地址
Tags: entry.Service.Tags, // 服务标签
})
}
return services
}
type serviceSet struct {
serviceName string
watcher map[*watcher]struct{}
services *atomic.Value // 原子性操作
lock sync.RWMutex // 读写锁
}
func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) {
//原子操作, 保证线程安全, 我们平时写struct的时候
s.services.Store(ss) // 是 atomic.Value 一个方法 用于将新值存储到atomic.Value变量中
s.lock.RLock()
defer s.lock.RUnlock()
for k := range s.watcher {
select {
case k.event <- struct{}{}: // 写入 空结构
default:
}
}
}
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
a23f37e8bd2b

搜索帮助

53164aa7 5694891 3bd8fe86 5694891