代码拉取完成,页面将自动刷新
package etcdv3
import (
"errors"
"path"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
type etcdv3Watcher struct {
stop chan bool
w clientv3.WatchChan
client *clientv3.Client
timeout time.Duration
}
func newEtcdv3Watcher(r *etcdv3Registry, timeout time.Duration) (registry.Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
stop := make(chan bool, 1)
go func() {
<-stop
cancel()
}()
return &etcdv3Watcher{
stop: stop,
w: r.client.Watch(ctx, prefix, clientv3.WithPrefix()),
client: r.client,
timeout: timeout,
}, nil
}
func (ew *etcdv3Watcher) Next() (*registry.Result, error) {
for wresp := range ew.w {
if wresp.Err() != nil {
return nil, wresp.Err()
}
for _, ev := range wresp.Events {
service := decode(ev.Kv.Value)
var action string
switch ev.Type {
case clientv3.EventTypePut:
if ev.IsCreate() {
action = "create"
} else if ev.IsModify() {
action = "update"
}
case clientv3.EventTypeDelete:
action = "delete"
// get the cached value
ctx, cancel := context.WithTimeout(context.Background(), ew.timeout)
defer cancel()
resp, err := ew.client.Get(ctx, path.Join(cachePrefix, string(ev.Kv.Key)))
if err != nil {
return nil, err
}
for _, ev := range resp.Kvs {
service = decode(ev.Value)
}
}
if service == nil {
continue
}
return ®istry.Result{
Action: action,
Service: service,
}, nil
}
}
return nil, errors.New("could not get next")
}
func (ew *etcdv3Watcher) Stop() {
select {
case <-ew.stop:
return
default:
close(ew.stop)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。