1 Star 0 Fork 0

颜言/grpc-todoList

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
register.go 3.77 KB
一键复制 编辑 原始数据 按行查看 历史
颜言 提交于 2024-11-14 21:06 +08:00 . 初始化
package discovery
import (
"context"
"encoding/json"
"errors"
"net/http"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
)
type Register struct {
EtcdAddrs []string
DialTimeout int
closeCh chan struct{}
leasesID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
srvInfo Server
srvTTL int64
cli *clientv3.Client
logger *logrus.Logger
}
// NewRegister create a register based on etcd
func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register {
return &Register{
EtcdAddrs: etcdAddrs,
DialTimeout: 3,
logger: logger,
}
}
// Register a user
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
var err error
if strings.Split(srvInfo.Addr, ":")[0] == "" {
return nil, errors.New("invalid ip address")
}
if r.cli, err = clientv3.New(clientv3.Config{
Endpoints: r.EtcdAddrs,
DialTimeout: time.Duration(r.DialTimeout) * time.Second,
}); err != nil {
return nil, err
}
r.srvInfo = srvInfo
r.srvTTL = ttl
if err = r.register(); err != nil {
return nil, err
}
r.closeCh = make(chan struct{})
go r.keepAlive()
return r.closeCh, nil
}
func (r *Register) register() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
defer cancel()
leaseResp, err := r.cli.Grant(ctx, r.srvTTL)
if err != nil {
return err
}
r.leasesID = leaseResp.ID
if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil {
return err
}
data, err := json.Marshal(r.srvInfo)
if err != nil {
return err
}
_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
return err
}
// Stop stop register
func (r *Register) Stop() {
r.closeCh <- struct{}{}
}
// unregister 删除节点
func (r *Register) unregister() error {
_, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo))
return err
}
func (r *Register) keepAlive() {
ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
for {
select {
case <-r.closeCh:
if err := r.unregister(); err != nil {
r.logger.Error("unregister failed, error: ", err)
}
if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
r.logger.Error("revoke failed, error: ", err)
}
case res := <-r.keepAliveCh:
if res == nil {
if err := r.register(); err != nil {
r.logger.Error("register failed, error: ", err)
}
}
case <-ticker.C:
if r.keepAliveCh == nil {
if err := r.register(); err != nil {
r.logger.Error("register failed, error: ", err)
}
}
}
}
}
func (r *Register) UpdateHandler() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
weightstr := req.URL.Query().Get("weight")
weight, err := strconv.Atoi(weightstr)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var update = func() error {
r.srvInfo.Weight = int64(weight)
data, err := json.Marshal(r.srvInfo)
if err != nil {
return err
}
_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
return err
}
if err := update(); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
_, _ = w.Write([]byte("update server weight success"))
})
}
func (r *Register) GetServerInfo() (Server, error) {
resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo))
if err != nil {
return r.srvInfo, err
}
server := Server{}
if resp.Count >= 1 {
if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil {
return server, err
}
}
return server, err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ujq/grpc-todoList.git
git@gitee.com:ujq/grpc-todoList.git
ujq
grpc-todoList
grpc-todoList
24005026fc92

搜索帮助