代码拉取完成,页面将自动刷新
package discovery
import (
"context"
"encoding/json"
"errors"
"net/http"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
)
type Register struct {
EtcdAddrs []string
DialTimeout int
closeCh chan struct{}
leasesID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
srvInfo Server
srvTTL int64
cli *clientv3.Client
logger *logrus.Logger
}
// NewRegister create a register based on etcd
func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register {
return &Register{
EtcdAddrs: etcdAddrs,
DialTimeout: 3,
logger: logger,
}
}
// Register a user
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
var err error
if strings.Split(srvInfo.Addr, ":")[0] == "" {
return nil, errors.New("invalid ip address")
}
if r.cli, err = clientv3.New(clientv3.Config{
Endpoints: r.EtcdAddrs,
DialTimeout: time.Duration(r.DialTimeout) * time.Second,
}); err != nil {
return nil, err
}
r.srvInfo = srvInfo
r.srvTTL = ttl
if err = r.register(); err != nil {
return nil, err
}
r.closeCh = make(chan struct{})
go r.keepAlive()
return r.closeCh, nil
}
func (r *Register) register() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
defer cancel()
leaseResp, err := r.cli.Grant(ctx, r.srvTTL)
if err != nil {
return err
}
r.leasesID = leaseResp.ID
if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil {
return err
}
data, err := json.Marshal(r.srvInfo)
if err != nil {
return err
}
_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
return err
}
// Stop stop register
func (r *Register) Stop() {
r.closeCh <- struct{}{}
}
// unregister 删除节点
func (r *Register) unregister() error {
_, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo))
return err
}
func (r *Register) keepAlive() {
ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
for {
select {
case <-r.closeCh:
if err := r.unregister(); err != nil {
r.logger.Error("unregister failed, error: ", err)
}
if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
r.logger.Error("revoke failed, error: ", err)
}
case res := <-r.keepAliveCh:
if res == nil {
if err := r.register(); err != nil {
r.logger.Error("register failed, error: ", err)
}
}
case <-ticker.C:
if r.keepAliveCh == nil {
if err := r.register(); err != nil {
r.logger.Error("register failed, error: ", err)
}
}
}
}
}
func (r *Register) UpdateHandler() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
weightstr := req.URL.Query().Get("weight")
weight, err := strconv.Atoi(weightstr)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var update = func() error {
r.srvInfo.Weight = int64(weight)
data, err := json.Marshal(r.srvInfo)
if err != nil {
return err
}
_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
return err
}
if err := update(); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
_, _ = w.Write([]byte("update server weight success"))
})
}
func (r *Register) GetServerInfo() (Server, error) {
resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo))
if err != nil {
return r.srvInfo, err
}
server := Server{}
if resp.Count >= 1 {
if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil {
return server, err
}
}
return server, err
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。