Fetch the repository succeeded.
package config
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"strings"
"sync"
"time"
etcd "go.etcd.io/etcd/client/v3"
"gitee.com/titan-kit/titan/config"
"gitee.com/titan-kit/titan/log"
)
var _ config.StoreClient = &Client{}
// Client 是etcd客户端(v3)的包装器
type Client struct {
logger *log.Slf4g
client *etcd.Client
watches map[string]*Watch
// 保护守护者
wm sync.Mutex
}
// NewEtcdClient 返回一个*etcd.Client(v3)并与命名机器建立连接.
func NewEtcdClient(machines []string, cert, key, caCert string, basicAuth bool, username string, password string) (*Client, error) {
cfg := etcd.Config{
Endpoints: machines,
DialTimeout: 5 * time.Second,
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 3 * time.Second,
}
if basicAuth {
cfg.Username = username
cfg.Password = password
}
tlsEnabled := false
tlsConfig := &tls.Config{InsecureSkipVerify: false}
if caCert != "" {
certBytes, err := ioutil.ReadFile(caCert)
if err != nil {
return &Client{}, err
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(certBytes)
if ok {
tlsConfig.RootCAs = caCertPool
}
tlsEnabled = true
}
if cert != "" && key != "" {
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return &Client{}, err
}
tlsConfig.Certificates = []tls.Certificate{tlsCert}
tlsEnabled = true
}
if tlsEnabled {
cfg.TLS = tlsConfig
}
client, err := etcd.New(cfg)
if err != nil {
return &Client{}, err
}
logger := log.NewSlf4g("backends/etcd", log.DefaultLogger)
return &Client{logger, client, make(map[string]*Watch), sync.Mutex{}}, nil
}
// GetValues 查询etcd中以keys为前缀的值.
func (c *Client) GetValues(keys []string) (map[string]string, error) {
var firstRev int64 = 0 // 在同一修订版上使用所有操作
vars := make(map[string]string)
maxTxnOps := 128 // 默认ETCDv3 TXN限制.由于可以从v3.3进行配置,因此也许应该添加一个选项(同时设置max-txn=0可以禁用Txn吗?)
getOps := make([]string, 0, maxTxnOps)
doTxn := func(ops []string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(3)*time.Second)
defer cancel()
txnOps := make([]etcd.Op, 0, maxTxnOps)
for _, k := range ops {
_txnOps := etcd.OpGet(k,
etcd.WithPrefix(),
etcd.WithSort(etcd.SortByKey, etcd.SortDescend),
etcd.WithRev(firstRev),
)
txnOps = append(txnOps, _txnOps)
}
result, err := c.client.Txn(ctx).Then(txnOps...).Commit()
if err != nil {
return err
}
for i, r := range result.Responses {
originKey := ops[i]
// 如果不存在,请附加一个'/'
originKeyFixed := originKey
if !strings.HasSuffix(originKeyFixed, "/") {
originKeyFixed = originKey + "/"
}
for _, ev := range r.GetResponseRange().Kvs {
k := string(ev.Key)
if k == originKey || strings.HasPrefix(k, originKeyFixed) {
vars[string(ev.Key)] = string(ev.Value)
}
}
}
if firstRev == 0 {
firstRev = result.Header.GetRevision() // 保存修订版本为第一个请求版本
}
return nil
}
for _, key := range keys {
getOps = append(getOps, key)
if len(getOps) >= maxTxnOps {
if err := doTxn(getOps); err != nil {
return vars, err
}
getOps = getOps[:0]
}
}
if len(getOps) > 0 {
if err := doTxn(getOps); err != nil {
return vars, err
}
}
return vars, nil
}
func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
var err error
watches := make(map[string]*Watch) // 为每个键创建守护者
c.wm.Lock()
for _, k := range keys {
watch, ok := c.watches[k]
if !ok {
watch, err = createWatch(c.logger, c.client, k)
if err != nil {
c.wm.Unlock()
return 0, err
}
c.watches[k] = watch
}
watches[k] = watch
}
c.wm.Unlock()
ctx, cancel := context.WithCancel(context.Background())
cancelRoutine := make(chan struct{})
defer cancel()
defer close(cancelRoutine)
go func() {
select {
case <-stopChan:
cancel()
case <-cancelRoutine:
return
}
}()
notify := make(chan int64)
// 等待所有守护者
for _, v := range watches {
go v.WaitNext(ctx, int64(waitIndex), notify)
}
select {
case nextRevision := <-notify:
return uint64(nextRevision), err
case <-ctx.Done():
return 0, ctx.Err()
}
}
func (c *Client) Close() error {
return c.client.Close()
}
// Watch 守护者只显示最新版本
type Watch struct {
// 上次看到的修订
revision int64
// 版本更改后将关闭的通道等待
cond chan struct{}
// 使用RWMutex保护条件变量
rwl sync.RWMutex
}
// WaitNext 等待修订版本大于lastRevision
func (w *Watch) WaitNext(ctx context.Context, lastRevision int64, notify chan<- int64) {
for {
w.rwl.RLock()
if w.revision > lastRevision {
w.rwl.RUnlock()
break
}
cond := w.cond
w.rwl.RUnlock()
select {
case <-cond:
case <-ctx.Done():
return
}
}
// 我们接受较大的修订,因此不需要使用RLock
select {
case notify <- w.revision:
case <-ctx.Done():
}
}
// 更新修订
func (w *Watch) update(newRevision int64) {
w.rwl.Lock()
defer w.rwl.Unlock()
w.revision = newRevision
close(w.cond)
w.cond = make(chan struct{})
}
func createWatch(log *log.Slf4g, client *etcd.Client, prefix string) (*Watch, error) {
w := &Watch{0, make(chan struct{}), sync.RWMutex{}}
go func() {
rch := client.Watch(context.Background(), prefix, etcd.WithPrefix(), etcd.WithCreatedNotify())
log.Debug("Watch created on %s", prefix)
for {
for resp := range rch {
if resp.CompactRevision > w.revision { //重视最新的更新
w.update(resp.CompactRevision)
log.Debug("Watch to '%s' updated to %d by CompactRevision", prefix, resp.CompactRevision)
} else if resp.Header.GetRevision() > w.revision { // 观察到创建或更新
w.update(resp.Header.GetRevision())
log.Debug("Watch to '%s' updated to %d by header revision", prefix, resp.Header.GetRevision())
}
if err := resp.Err(); err != nil {
log.Error("守护者发生错误: %s", err.Error())
}
}
log.Warning("Watch to '%s' stopped at revision %d", prefix, w.revision)
time.Sleep(time.Duration(1) * time.Second) // 断开连接或取消,请稍等片刻,以免重新连接太快
if w.revision > 0 { // 从下一个修订版开始,因此我们不会丢失任何内容
rch = client.Watch(context.Background(), prefix, etcd.WithPrefix(), etcd.WithRev(w.revision+1))
} else {
// 从最新版本开始
rch = client.Watch(context.Background(), prefix, etcd.WithPrefix(), etcd.WithCreatedNotify())
}
}
}()
return w, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。