1 Star 0 Fork 0

天雨流芳 / go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
registry.go 8.87 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-03-27 17:19 . 修改创建的时候的配置
package eureka
import (
"bytes"
"context"
"encoding/json"
"fmt"
"gitee.com/tylf2018/go-micro-framework/registry"
regOps "gitee.com/tylf2018/go-micro-framework/registry/registry"
"io"
"math/rand"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
const (
statusUp = "UP"
statusDown = "DOWN"
statusOutOfService = "OUT_OF_SERVICE"
heartbeatRetry = 3
maxIdleConns = 100
heartbeatTime = 10 * time.Second
httpTimeout = 3 * time.Second
refreshTime = 30 * time.Second
)
type EurekaRegistry struct {
ctx context.Context
refreshInterval time.Duration
heartbeatInterval time.Duration
urls []string
eurekaPath string
maxRetry int
client *http.Client
keepalive map[string]chan struct{}
lock sync.Mutex
}
type Option func(o *EurekaRegistry)
func NewEurekaRegistry(reg *regOps.RegistryOptions, opts ...Option) *EurekaRegistry {
tr := &http.Transport{
MaxIdleConns: maxIdleConns,
}
r := &EurekaRegistry{
ctx: context.Background(),
heartbeatInterval: heartbeatTime,
refreshInterval: refreshTime,
eurekaPath: "eureka/v2",
urls: []string{reg.Address},
maxRetry: 3,
client: &http.Client{Transport: tr, Timeout: httpTimeout},
keepalive: make(map[string]chan struct{}),
}
for _, o := range opts {
o(r)
}
return r
}
func (r *EurekaRegistry) Register(ctx context.Context, service *registry.ServiceInstance) error {
endpoints := r.Endpoints(service)
if len(endpoints) == 0 {
return nil
}
for _, endpoint := range endpoints {
if err := r.registerEndpoint(ctx, endpoint); err != nil {
return err
}
go r.Heartbeat(endpoint)
}
return nil
}
func (r *EurekaRegistry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
endpoints := r.Endpoints(service)
if len(endpoints) == 0 {
return nil
}
for _, endpoint := range endpoints {
if err := r.do(ctx, http.MethodDelete, []string{"apps", endpoint.AppID, endpoint.InstanceID}, nil, nil); err != nil {
return err
}
}
return nil
}
type RequestInstance struct {
Instance Instance `json:"instance"`
}
type Port struct {
Port int `json:"$"`
Enabled string `json:"@enabled"`
}
type DataCenterInfo struct {
Name string `json:"name"`
Class string `json:"@class"`
}
type Instance struct {
InstanceID string `json:"instanceId"`
HostName string `json:"hostName"`
Port Port `json:"port"`
App string `json:"app"`
IPAddr string `json:"ipAddr"`
VipAddress string `json:"vipAddress"`
Status string `json:"status"`
SecurePort Port `json:"securePort"`
HomePageURL string `json:"homePageUrl"`
StatusPageURL string `json:"statusPageUrl"`
HealthCheckURL string `json:"healthCheckUrl"`
DataCenterInfo DataCenterInfo `json:"dataCenterInfo"`
Metadata map[string]string `json:"metadata"`
}
func (r *EurekaRegistry) registerEndpoint(ctx context.Context, ep Endpoint) error {
instance := RequestInstance{
Instance: Instance{
InstanceID: ep.InstanceID,
HostName: ep.AppID,
Port: Port{
Port: ep.Port,
Enabled: "true",
},
App: ep.AppID,
IPAddr: ep.IP,
VipAddress: ep.AppID,
Status: statusUp,
SecurePort: Port{
Port: ep.SecurePort,
Enabled: "false",
},
HomePageURL: ep.HomePageURL,
StatusPageURL: ep.StatusPageURL,
HealthCheckURL: ep.HealthCheckURL,
DataCenterInfo: DataCenterInfo{
Name: "MyOwn",
Class: "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
},
Metadata: ep.MetaData,
},
}
body, err := json.Marshal(instance)
if err != nil {
return err
}
return r.do(ctx, http.MethodPost, []string{"apps", ep.AppID}, bytes.NewReader(body), nil)
}
func (r *EurekaRegistry) do(ctx context.Context, method string, params []string, input io.Reader, output interface{}) error {
for i := 0; i < r.maxRetry; i++ {
retry, err := r.request(ctx, method, params, input, output, i)
if retry {
continue
}
if err != nil {
return err
}
return nil
}
return fmt.Errorf("retry after %d times", r.maxRetry)
}
func (r *EurekaRegistry) request(ctx context.Context, method string, params []string, input io.Reader, output interface{}, i int) (bool, error) {
request, err := http.NewRequestWithContext(ctx, method, r.buildAPI(i, params...), input)
if err != nil {
return false, err
}
request.Header.Add("User-Agent", "go-eureka-client")
request.Header.Add("Accept", "application/json;charset=UTF-8")
request.Header.Add("Content-Type", "application/json;charset=UTF-8")
resp, err := r.client.Do(request)
if err != nil {
return true, err
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
if output != nil && resp.StatusCode/100 == 2 {
data, err := io.ReadAll(resp.Body)
if err != nil {
return false, err
}
err = json.Unmarshal(data, output)
if err != nil {
return false, err
}
}
if resp.StatusCode >= http.StatusBadRequest {
return false, fmt.Errorf("response Error %d", resp.StatusCode)
}
return false, nil
}
func (r *EurekaRegistry) buildAPI(currentTimes int, params ...string) string {
if currentTimes == 0 {
r.shuffle()
}
server := r.pickServer(currentTimes)
params = append([]string{server, r.eurekaPath}, params...)
return strings.Join(params, "/")
}
func (r *EurekaRegistry) pickServer(currentTimes int) string {
return r.urls[currentTimes%r.maxRetry]
}
func (r *EurekaRegistry) shuffle() {
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(r.urls), func(i, j int) {
r.urls[i], r.urls[j] = r.urls[j], r.urls[i]
})
}
func (e *EurekaRegistry) Heartbeat(ep Endpoint) {
e.lock.Lock()
e.keepalive[ep.AppID] = make(chan struct{})
e.lock.Unlock()
ticker := time.NewTicker(e.heartbeatInterval)
defer ticker.Stop()
retryCount := 0
for {
select {
case <-e.ctx.Done():
return
case <-e.keepalive[ep.AppID]:
return
case <-ticker.C:
if err := e.do(e.ctx, http.MethodPut, []string{"apps", ep.AppID, ep.InstanceID}, nil, nil); err != nil {
if retryCount++; retryCount > heartbeatRetry {
_ = e.registerEndpoint(e.ctx, ep)
retryCount = 0
}
}
}
}
}
type Endpoint struct {
InstanceID string
IP string
AppID string
Port int
SecurePort int
HomePageURL string
StatusPageURL string
HealthCheckURL string
MetaData map[string]string
}
func (r *EurekaRegistry) Endpoints(service *registry.ServiceInstance) []Endpoint {
res := make([]Endpoint, 0, len(service.Endpoints))
for _, ep := range service.Endpoints {
start := strings.Index(ep, "//")
end := strings.LastIndex(ep, ":")
appID := strings.ToUpper(service.Name)
ip := ep[start+2 : end]
sport := ep[end+1:]
port, _ := strconv.Atoi(sport)
securePort := 443
homePageURL := fmt.Sprintf("%s/", ep)
statusPageURL := fmt.Sprintf("%s/info", ep)
healthCheckURL := fmt.Sprintf("%s/health", ep)
instanceID := strings.Join([]string{ip, appID, sport}, ":")
metadata := make(map[string]string)
if len(service.Metadata) > 0 {
metadata = service.Metadata
}
if s, ok := service.Metadata["securePort"]; ok {
securePort, _ = strconv.Atoi(s)
}
if s, ok := service.Metadata["homePageURL"]; ok {
homePageURL = s
}
if s, ok := service.Metadata["statusPageURL"]; ok {
statusPageURL = s
}
if s, ok := service.Metadata["healthCheckURL"]; ok {
healthCheckURL = s
}
metadata["ID"] = service.ID
metadata["Name"] = service.Name
metadata["Version"] = service.Version
metadata["Endpoints"] = ep
metadata["agent"] = "go-eureka-client"
res = append(res, Endpoint{
AppID: appID,
IP: ip,
Port: port,
SecurePort: securePort,
HomePageURL: homePageURL,
StatusPageURL: statusPageURL,
HealthCheckURL: healthCheckURL,
InstanceID: instanceID,
MetaData: metadata,
})
}
return res
}
// WithContext with registry context.
func WithContext(ctx context.Context) Option {
return func(o *EurekaRegistry) {
o.ctx = ctx
}
}
func WithHeartbeat(interval time.Duration) Option {
return func(o *EurekaRegistry) {
o.heartbeatInterval = interval
}
}
func WithRefresh(interval time.Duration) Option {
return func(o *EurekaRegistry) {
o.refreshInterval = interval
}
}
func WithEurekaPath(path string) Option {
return func(o *EurekaRegistry) {
o.eurekaPath = path
}
}
func WithMaxRetry(maxRetry int) Option {
return func(o *EurekaRegistry) {
o.maxRetry = maxRetry
}
}
func WithHeartbeatInterval(interval time.Duration) Option {
return func(o *EurekaRegistry) {
o.heartbeatInterval = interval
}
}
func WithClientContext(ctx context.Context) Option {
return func(e *EurekaRegistry) {
e.ctx = ctx
}
}
func WithNamespace(path string) Option {
return func(o *EurekaRegistry) {
o.eurekaPath = path
}
}
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
a23f37e8bd2b

搜索帮助

53164aa7 5694891 3bd8fe86 5694891