Fetch the repository succeeded.
package client
import (
"strings"
"sync"
"time"
"github.com/rpcxio/libkv"
"github.com/rpcxio/libkv/store"
"github.com/rpcxio/libkv/store/consul"
"github.com/smallnest/rpcx/log"
)
func init() {
consul.Register()
}
// ConsulDiscovery is a consul service discovery.
// It always returns the registered servers in consul.
type ConsulDiscovery struct {
basePath string
kv store.Store
pairsMu sync.RWMutex
pairs []*KVPair
chans []chan []*KVPair
mu sync.Mutex
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
RetriesAfterWatchFailed int
filter ServiceDiscoveryFilter
stopCh chan struct{}
}
// NewConsulDiscovery returns a new ConsulDiscovery.
func NewConsulDiscovery(basePath, servicePath string, consulAddr []string, options *store.Config) (ServiceDiscovery, error) {
kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
if err != nil {
log.Infof("cannot create store: %v", err)
return nil, err
}
return NewConsulDiscoveryStore(basePath+"/"+servicePath, kv)
}
// NewConsulDiscoveryStore returns a new ConsulDiscovery with specified store.
func NewConsulDiscoveryStore(basePath string, kv store.Store) (ServiceDiscovery, error) {
if basePath[0] == '/' {
basePath = basePath[1:]
}
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
basePath = basePath[:len(basePath)-1]
}
d := &ConsulDiscovery{basePath: basePath, kv: kv}
d.stopCh = make(chan struct{})
ps, err := kv.List(basePath)
if err != nil && err != store.ErrKeyNotFound {
log.Infof("cannot get services of from registry: %v, err: %v", basePath, err)
return nil, err
}
pairs := make([]*KVPair, 0, len(ps))
prefix := d.basePath + "/"
for _, p := range ps {
k := strings.TrimPrefix(p.Key, prefix)
pair := &KVPair{Key: k, Value: string(p.Value)}
if d.filter != nil && !d.filter(pair) {
continue
}
pairs = append(pairs, pair)
}
d.pairsMu.Lock()
d.pairs = pairs
d.pairsMu.Unlock()
d.RetriesAfterWatchFailed = -1
go d.watch()
return d, nil
}
// NewConsulDiscoveryTemplate returns a new ConsulDiscovery template.
func NewConsulDiscoveryTemplate(basePath string, consulAddr []string, options *store.Config) (ServiceDiscovery, error) {
if basePath[0] == '/' {
basePath = basePath[1:]
}
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
basePath = basePath[:len(basePath)-1]
}
kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
if err != nil {
log.Infof("cannot create store: %v", err)
return nil, err
}
return &ConsulDiscovery{basePath: basePath, kv: kv}, nil
}
// Clone clones this ServiceDiscovery with new servicePath.
func (d *ConsulDiscovery) Clone(servicePath string) (ServiceDiscovery, error) {
return NewConsulDiscoveryStore(d.basePath+"/"+servicePath, d.kv)
}
// SetFilter sets the filer.
func (d *ConsulDiscovery) SetFilter(filter ServiceDiscoveryFilter) {
d.filter = filter
}
// GetServices returns the servers
func (d *ConsulDiscovery) GetServices() []*KVPair {
d.pairsMu.RLock()
defer d.pairsMu.RUnlock()
return d.pairs
}
// WatchService returns a nil chan.
func (d *ConsulDiscovery) WatchService() chan []*KVPair {
d.mu.Lock()
defer d.mu.Unlock()
ch := make(chan []*KVPair, 10)
d.chans = append(d.chans, ch)
return ch
}
func (d *ConsulDiscovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
defer d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *ConsulDiscovery) watch() {
defer func() {
d.kv.Close()
}()
for {
var err error
var c <-chan []*store.KVPair
var tempDelay time.Duration
retry := d.RetriesAfterWatchFailed
for d.RetriesAfterWatchFailed < 0 || retry >= 0 {
c, err = d.kv.WatchTree(d.basePath, nil)
if err != nil {
if d.RetriesAfterWatchFailed > 0 {
retry--
}
if tempDelay == 0 {
tempDelay = 1 * time.Second
} else {
tempDelay *= 2
}
if max := 30 * time.Second; tempDelay > max {
tempDelay = max
}
log.Warnf("can not watchtree (with retry %d, sleep %v): %s: %v", retry, tempDelay, d.basePath, err)
time.Sleep(tempDelay)
continue
}
break
}
if err != nil {
log.Errorf("can't watch %s: %v", d.basePath, err)
return
}
prefix := d.basePath + "/"
readChanges:
for {
select {
case <-d.stopCh:
log.Info("discovery has been closed")
return
case ps := <-c:
if ps == nil {
break readChanges
}
var pairs []*KVPair // latest servers
for _, p := range ps {
k := strings.TrimPrefix(p.Key, prefix)
pair := &KVPair{Key: k, Value: string(p.Value)}
if d.filter != nil && !d.filter(pair) {
continue
}
pairs = append(pairs, pair)
}
d.pairsMu.Lock()
d.pairs = pairs
d.pairsMu.Unlock()
d.mu.Lock()
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
recover()
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):
log.Warn("chan is full and new change has been dropped")
}
}()
}
d.mu.Unlock()
}
}
log.Warn("chan is closed and will rewatch")
}
}
func (d *ConsulDiscovery) Close() {
close(d.stopCh)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。