5 Star 0 Fork 0

Md/gu

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
client.go 2.71 KB
一键复制 编辑 原始数据 按行查看 历史
liyafei 提交于 2023-02-24 13:00 . 修改 push 实现
package cfgpltfm
import (
"context"
"fmt"
"sync"
"time"
"gitee.com/MikeDDD/gu/cfgpltfm/cfgpltrpcpb"
"gitee.com/MikeDDD/gu/guutil"
"gitee.com/MikeDDD/gu/logs"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type CfgpltfmClient struct {
cfgpltAddr string
selfAddr string
env string
keyHandler sync.Map // map["key_camp"]func(*cfgpltrpcpb.GetResp)
cfgpltrpcpb.UnimplementedCfgpltClientRpcServer
}
func New(cfgpltAddr, selfAddr, env string) *CfgpltfmClient {
cli := &CfgpltfmClient{
cfgpltAddr: cfgpltAddr,
selfAddr: selfAddr,
env: env,
}
return cli
}
// 客户端 => 中心 心跳
func (cli *CfgpltfmClient) Ping() bool {
defer guutil.InDeferRecover()
conn, err := grpc.Dial(cli.cfgpltAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logs.Error(err)
return false
}
defer conn.Close()
client := cfgpltrpcpb.NewCfgpltServiceRpcClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := client.Regist(ctx, &cfgpltrpcpb.RegistReq{
ClientAddr: cli.selfAddr,
Env: cli.env,
}); err != nil {
logs.Error(err)
return false
}
return true
}
// 客户端 => 中心 请求
func (cli *CfgpltfmClient) Get(key string, camp int32) *cfgpltrpcpb.GetResp {
defer guutil.InDeferRecover()
conn, err := grpc.Dial(cli.cfgpltAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logs.Error(err)
return nil
}
defer conn.Close()
client := cfgpltrpcpb.NewCfgpltServiceRpcClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := client.Get(ctx, &cfgpltrpcpb.GetReq{Key: key, Camp: camp})
if err != nil {
logs.Error(err)
return nil
}
return resp
}
// 客户端 => 中心 请求
func (cli *CfgpltfmClient) GetAndWatch(key string, camp int32, callback func(*cfgpltrpcpb.GetResp)) bool {
defer guutil.InDeferRecover()
resp := cli.Get(key, camp)
if resp == nil {
return false
}
cli.keyHandler.Store(fmt.Sprintf("%v_%v", key, camp), callback)
callback(resp)
return true
}
// 中心 => 客户端
func (cli *CfgpltfmClient) Push(ctx context.Context, req *cfgpltrpcpb.PushReq) (*cfgpltrpcpb.PushResp, error) {
if req.Env != cli.env {
logs.Info("skip env %v, self %v", req.Env, cli.env)
return &cfgpltrpcpb.PushResp{}, nil
}
callback, exist := cli.keyHandler.Load(fmt.Sprintf("%v_%v", req.Key, req.Camp))
if !exist {
logs.Warn("no handler for %v", fmt.Sprintf("%v_%v", req.Key, req.Camp))
return &cfgpltrpcpb.PushResp{}, nil
}
callback.(func(*cfgpltrpcpb.GetResp))(&cfgpltrpcpb.GetResp{
Key: req.Key,
Camp: req.Camp,
Value: req.Value,
})
return &cfgpltrpcpb.PushResp{}, nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/MikeDDD/gu.git
git@gitee.com:MikeDDD/gu.git
MikeDDD
gu
gu
v0.0.35

搜索帮助