1 Star 0 Fork 0

天雨流芳/go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
discovery.go 7.47 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-06-27 14:31 . 适配错误码
package eureka
import (
"context"
"encoding/json"
"fmt"
"gitee.com/tylf2018/go-micro-framework/registry"
regOps "gitee.com/tylf2018/go-micro-framework/registry/options"
"io"
"math/rand"
"net/http"
"strings"
"sync"
"time"
)
const (
statusUp = "UP"
statusDown = "DOWN"
statusOutOfService = "OUT_OF_SERVICE"
heartbeatRetry = 3
maxIdleConns = 100
heartbeatTime = 10 * time.Second
httpTimeout = 3 * time.Second
refreshTime = 30 * time.Second
)
type EurekaDiscovery struct {
maxRetry int
eurekaPath string
urls []string
client *http.Client
allInstances map[string][]Instance
subscribers map[string]*subscriber
lock sync.Mutex
}
func NewEurekaDiscovery(options *regOps.DiscoveryOptions) *EurekaDiscovery {
tr := &http.Transport{
MaxIdleConns: maxIdleConns,
}
return &EurekaDiscovery{
maxRetry: 3,
eurekaPath: "eureka/v2",
urls: []string{options.Address},
client: &http.Client{Transport: tr, Timeout: httpTimeout},
allInstances: make(map[string][]Instance),
subscribers: make(map[string]*subscriber),
}
}
// GetService get services from eureka
func (d *EurekaDiscovery) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
instances := d.GetServiceInstances(ctx, serviceName)
items := make([]*registry.ServiceInstance, 0, len(instances))
if len(instances) == 0 {
return items, nil
}
for _, instance := range instances {
items = append(items, &registry.ServiceInstance{
ID: instance.Metadata["ID"],
Name: instance.Metadata["Name"],
Version: instance.Metadata["Version"],
Endpoints: []string{instance.Metadata["Endpoints"]},
Metadata: instance.Metadata,
})
}
return items, nil
}
// Watch 是独立的ctx
func (d *EurekaDiscovery) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
return newWatch(ctx, d, serviceName)
}
func (d *EurekaDiscovery) Subscribe(serverName string, fn func()) error {
d.lock.Lock()
appID := toAppID(serverName)
d.subscribers[appID] = &subscriber{
appID: appID,
callBack: fn,
}
d.lock.Unlock()
go func() {
ticker := time.NewTicker(time.Minute * 5) // 执行定时任务,暂定5分钟循环一次
defer ticker.Stop()
for {
<-ticker.C
d.broadcast()
}
}()
return nil
}
func (d *EurekaDiscovery) broadcast() {
instances := d.cacheAllInstances()
if instances == nil {
return
}
for _, subscriber := range d.subscribers {
go subscriber.callBack()
}
d.lock.Lock()
defer d.lock.Unlock()
d.allInstances = instances
}
func (d *EurekaDiscovery) cacheAllInstances() map[string][]Instance {
items := make(map[string][]Instance)
instances := d.fetchAllUpInstances(context.Background())
for _, instance := range instances {
items[toAppID(instance.App)] = append(items[instance.App], instance)
}
return items
}
func (d *EurekaDiscovery) fetchAllUpInstances(ctx context.Context) []Instance {
return d.filterUp(d.fetchApps(ctx)...)
}
func (d *EurekaDiscovery) fetchApps(ctx context.Context) []Application {
var m ApplicationsRootResponse
if err := d.do(ctx, http.MethodGet, []string{"apps"}, nil, &m); err != nil {
return nil
}
return m.Applications
}
func (d *EurekaDiscovery) GetServiceInstances(ctx context.Context, serverName string) []Instance {
appID := toAppID(serverName)
if ins, ok := d.allInstances[appID]; ok {
return ins
}
// if not in allInstances of API, you can try to obtain it separately again
return d.fetchAppUpInstances(ctx, appID)
}
func (d *EurekaDiscovery) Unsubscribe(serverName string) {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.subscribers, toAppID(serverName))
}
func (d *EurekaDiscovery) fetchAppUpInstances(ctx context.Context, appID string) []Instance {
app, err := d.fetchAppInstances(ctx, appID)
if err != nil {
return nil
}
return d.filterUp(app)
}
func (d *EurekaDiscovery) filterUp(apps ...Application) (res []Instance) {
for _, app := range apps {
for _, ins := range app.Instance {
if ins.Status == statusUp {
res = append(res, ins)
}
}
}
return
}
func (d *EurekaDiscovery) fetchAppInstances(ctx context.Context, appID string) (m Application, err error) {
err = d.do(ctx, http.MethodGet, []string{"apps", appID}, nil, &m)
return
}
func (d *EurekaDiscovery) do(ctx context.Context, method string, params []string, input io.Reader, output interface{}) error {
for i := 0; i < d.maxRetry; i++ {
retry, err := d.request(ctx, method, params, input, output, i)
if retry {
continue
}
if err != nil {
return err
}
return nil
}
return fmt.Errorf("retry after %d times", d.maxRetry)
}
func (d *EurekaDiscovery) request(ctx context.Context, method string, params []string, input io.Reader, output interface{}, i int) (bool, error) {
request, err := http.NewRequestWithContext(ctx, method, d.buildAPI(i, params...), input)
if err != nil {
return false, err
}
request.Header.Add("User-Agent", "go-eureka-client")
request.Header.Add("Accept", "application/json;charset=UTF-8")
request.Header.Add("Content-Type", "application/json;charset=UTF-8")
resp, err := d.client.Do(request)
if err != nil {
return true, err
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
if output != nil && resp.StatusCode/100 == 2 {
data, err := io.ReadAll(resp.Body)
if err != nil {
return false, err
}
err = json.Unmarshal(data, output)
if err != nil {
return false, err
}
}
if resp.StatusCode >= http.StatusBadRequest {
return false, fmt.Errorf("response Error %d", resp.StatusCode)
}
return false, nil
}
func (d *EurekaDiscovery) buildAPI(currentTimes int, params ...string) string {
if currentTimes == 0 {
d.shuffle()
}
server := d.pickServer(currentTimes)
params = append([]string{server, d.eurekaPath}, params...)
return strings.Join(params, "/")
}
func (d *EurekaDiscovery) pickServer(currentTimes int) string {
return d.urls[currentTimes%d.maxRetry]
}
func (d *EurekaDiscovery) shuffle() {
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(d.urls), func(i, j int) {
d.urls[i], d.urls[j] = d.urls[j], d.urls[i]
})
}
func toAppID(serverName string) string {
return strings.ToUpper(serverName)
}
type Instance struct {
InstanceID string `json:"instanceId"`
HostName string `json:"hostName"`
Port Port `json:"port"`
App string `json:"app"`
IPAddr string `json:"ipAddr"`
VipAddress string `json:"vipAddress"`
Status string `json:"status"`
SecurePort Port `json:"securePort"`
HomePageURL string `json:"homePageUrl"`
StatusPageURL string `json:"statusPageUrl"`
HealthCheckURL string `json:"healthCheckUrl"`
DataCenterInfo DataCenterInfo `json:"dataCenterInfo"`
Metadata map[string]string `json:"metadata"`
}
type Port struct {
Port int `json:"$"`
Enabled string `json:"@enabled"`
}
type DataCenterInfo struct {
Name string `json:"name"`
Class string `json:"@class"`
}
type Application struct {
Name string `json:"name"`
Instance []Instance `json:"instance"`
}
// ApplicationsRootResponse for /eureka/apps
type ApplicationsRootResponse struct {
ApplicationsResponse `json:"applications"`
}
type ApplicationsResponse struct {
Version string `json:"versions__delta"`
AppsHashcode string `json:"apps__hashcode"`
Applications []Application `json:"application"`
}
type subscriber struct {
appID string
callBack func()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
157c77bd6e0e

搜索帮助