1 Star 0 Fork 0

天雨流芳 / go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
registry.go 6.55 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-03-27 17:19 . 修改创建的时候的配置
package consul
import (
"context"
"errors"
"fmt"
log "gitee.com/tylf2018/go-micro-framework/pkg/logger"
"gitee.com/tylf2018/go-micro-framework/registry"
regOps "gitee.com/tylf2018/go-micro-framework/registry/registry"
"github.com/google/uuid"
"github.com/hashicorp/consul/api"
"net"
"net/url"
"strconv"
"sync"
"time"
)
// 基于consul注册中心的注册代码
// ConsulRegistry is consul registry
type ConsulRegistry struct {
cli *api.Client
enableHealthCheck bool // 是否开启健康检查
lock sync.RWMutex // 读写锁
ctx context.Context
cancel context.CancelFunc // 用来 退出程序
healthcheckInterval int
// heartbeat enable heartbeat 是否开启心跳 客户端 -> 服务端 我还存活着的消息
heartbeat bool
// deregisterCriticalServiceAfter time interval in seconds 注销服务请求发起后 以秒为单位时间间隔 的超时时间
deregisterCriticalServiceAfter int
// serviceChecks user custom checks 自定义检查接口
serviceChecks api.AgentServiceChecks
}
// Option is consul registry option.
type Option func(*ConsulRegistry)
func NewConsulRegistry(options *regOps.RegistryOptions, opts ...Option) *ConsulRegistry {
config := api.DefaultConfig()
config.Address = options.Address
config.Scheme = options.Scheme
apiClient, err := api.NewClient(config)
if err != nil {
log.ErrorF("create api client error: %v", err)
panic(err)
}
cr := &ConsulRegistry{
cli: apiClient,
enableHealthCheck: true,
healthcheckInterval: 10,
heartbeat: true,
deregisterCriticalServiceAfter: 600,
}
cr.ctx, cr.cancel = context.WithCancel(context.Background())
if len(opts) > 0 {
for _, o := range opts {
o(cr)
}
}
return cr
}
// Register register service
func (r *ConsulRegistry) Register(_ context.Context, svc *registry.ServiceInstance) error {
if svc.Name == "" {
return errors.New("service name cannot be empty")
}
if len(svc.Endpoints) == 0 {
return errors.New("service endpoints cannot be empty")
}
if svc.ID == "" {
newUUID, err := uuid.NewUUID()
if err != nil {
return errors.New("service id generate error")
}
svc.ID = newUUID.String()
}
addresses := make(map[string]api.ServiceAddress, len(svc.Endpoints))
checkAddresses := make([]string, 0, len(svc.Endpoints))
for _, endpoint := range svc.Endpoints { // 遍历 终端 server path
raw, err := url.Parse(endpoint) // 解析 成为 url 格式
if err != nil {
return err
}
host := raw.Hostname() // 获得 host
// 解析成 10进制 Uint16 的类型
port, _ := strconv.ParseUint(raw.Port(), 10, 16) // 保证 port 的准确性
// 将修改完成的 host:port 添加
checkAddresses = append(checkAddresses, net.JoinHostPort(host, strconv.FormatUint(port, 10)))
// 例如 addresses["http"] addresses["grpc"]
addresses[raw.Scheme] = api.ServiceAddress{Address: endpoint, Port: int(port)}
}
registration := &api.AgentServiceRegistration{
ID: svc.ID,
Name: svc.Name,
Meta: svc.Metadata,
Tags: []string{fmt.Sprintf("version=%s", svc.Version)},
TaggedAddresses: addresses,
}
if len(svc.Tags) > 0 {
registration.Tags = append(registration.Tags, svc.Tags...)
}
if len(checkAddresses) > 0 {
host, portRaw, _ := net.SplitHostPort(checkAddresses[0])
port, _ := strconv.ParseInt(portRaw, 10, 32)
registration.Address = host
registration.Port = int(port)
}
// 是否开启健康检查
if r.enableHealthCheck {
for _, address := range checkAddresses {
registration.Checks = append(registration.Checks, &api.AgentServiceCheck{
TCP: address,
Interval: fmt.Sprintf("%ds", r.healthcheckInterval),
DeregisterCriticalServiceAfter: fmt.Sprintf("%ds", r.deregisterCriticalServiceAfter),
Timeout: "5s",
})
}
}
// 是否开启心跳检查
if r.heartbeat {
registration.Checks = append(registration.Checks, &api.AgentServiceCheck{
CheckID: "service:" + svc.ID,
// 指定这是一个 TTL 检查,必须定期使用 TTL 端点来更新检查状态。如果检查未设置为在指定时间内通过,则检查将设置为失败状态。
TTL: fmt.Sprintf("%ds", r.healthcheckInterval*2),
DeregisterCriticalServiceAfter: fmt.Sprintf("%ds", r.deregisterCriticalServiceAfter),
})
}
// custom checks 自定义检查
registration.Checks = append(registration.Checks, r.serviceChecks...)
// 注册 服务到 consul
err := r.cli.Agent().ServiceRegister(registration)
if err != nil {
return err
}
// 实现 心跳 有 客户端 发送健康信息 给 consul
if r.heartbeat {
go func() {
time.Sleep(time.Second)
err = r.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
if err != nil {
log.ErrorF("[Consul]update ttl heartbeat to consul failed!err:=%v", err)
}
ticker := time.NewTicker(time.Second * time.Duration(r.healthcheckInterval)) // 用于执行定时操作
defer ticker.Stop() // 结束定时操作
for {
select {
case <-ticker.C:
err = r.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass") // 发送心跳
if err != nil {
log.ErrorF("[Consul]update ttl heartbeat to consul failed!err:=%v", err)
}
case <-r.ctx.Done(): // 外部想要结束心跳 需要 执行 ctx.done()
return
}
}
}()
}
return nil
}
// Deregister deregister service
func (r *ConsulRegistry) Deregister(_ context.Context, svc *registry.ServiceInstance) error {
if r.cancel != nil {
r.cancel()
}
return r.cli.Agent().ServiceDeregister(svc.ID)
}
// WithHealthCheck with registry health check option.
func WithHealthCheck(enable bool) Option {
return func(r *ConsulRegistry) {
r.enableHealthCheck = enable
}
}
// WithHeartbeat enable or disable heartbeat
func WithHeartbeat(enable bool) Option {
return func(r *ConsulRegistry) {
r.heartbeat = enable
}
}
// WithHealthCheckInterval with healthcheck interval in seconds.
func WithHealthCheckInterval(interval int) Option {
return func(r *ConsulRegistry) {
r.healthcheckInterval = interval
}
}
// WithDeregisterCriticalServiceAfter with deregister-critical-service-after in seconds.
func WithDeregisterCriticalServiceAfter(interval int) Option {
return func(r *ConsulRegistry) {
r.deregisterCriticalServiceAfter = interval
}
}
func WithAgentServiceCheck(checks ...*api.AgentServiceCheck) Option {
return func(r *ConsulRegistry) {
r.serviceChecks = append(r.serviceChecks, checks...)
}
}
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
e87e0c3d7074

搜索帮助