1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
watcher.go 1.34 KB
一键复制 编辑 原始数据 按行查看 历史
package etcd
import (
"context"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/discovery/service"
"gitee.com/h79/goutils/discovery/watch"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
type Watcher struct {
client *clientv3.Client
wc watch.Chan
ctx context.Context
cancel context.CancelFunc
}
func NewWatcher(client *clientv3.Client, sizeChan int) *Watcher {
ctx, cancel := context.WithCancel(context.Background())
return &Watcher{
client: client,
wc: make(watch.Chan, sizeChan),
cancel: cancel,
ctx: ctx,
}
}
func (watcher *Watcher) Watch(key watch.Key) (watch.Chan, error) {
wc := watcher.client.Watch(context.Background(), key.ToKey(), clientv3.WithPrefix())
system.ChildRunning(func() {
select {
case resp := <-wc:
for _, ev := range resp.Events {
data := service.Data{Key: service.NewKey(string(ev.Kv.Key)), Value: string(ev.Kv.Value)}
switch ev.Type {
case mvccpb.PUT:
watcher.wc <- watch.NewChanged(data, watch.Put)
case mvccpb.DELETE:
watcher.wc <- watch.NewChanged(data, watch.Delete)
}
}
case <-watcher.ctx.Done():
return
case <-system.Closed():
return
}
})
return watcher.wc, nil
}
func (watcher *Watcher) Changed(cmd watch.Changed) {
watcher.wc <- cmd
}
func (watcher *Watcher) Stop() error {
watcher.cancel()
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.8.48

搜索帮助