1 Star 0 Fork 0

蒙蒙的男孩 / polaris-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
impl.go 18.43 KB
一键复制 编辑 原始数据 按行查看 历史
蒙蒙的男孩 提交于 2023-10-26 20:52 . 添加调用方的实例ip属性
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
/**
* Tencent is pleased to support the open source community by making polaris-go available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package network
import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/modern-go/reflect2"
"gitee.com/meng_mengs_boys/polaris-go/pkg/algorithm/rand"
"gitee.com/meng_mengs_boys/polaris-go/pkg/config"
"gitee.com/meng_mengs_boys/polaris-go/pkg/log"
"gitee.com/meng_mengs_boys/polaris-go/pkg/model"
)
var (
flowID uint64
)
const (
defaultService = "polaris-default"
serviceReadyStatus = 2
getAddressTimeout = 300 * time.Millisecond
)
// ServerAddressList 服务地址列表
type ServerAddressList struct {
// 所属服务
service config.ClusterService
// 获取不到服务地址是否使用预埋IP
useDefault bool
// 当前生效连接,存放的是Connection对象
curConn atomic.Value
// 当前的index,只对预埋地址生效,用于轮询
curIndex int
// 预埋地址列表
addresses []string
// 首次连接控制锁
connectMutex sync.Mutex
// 全局管理对象指针
manager *connectionManager
}
// getAndConnectServer 获取并进行连接
func (s *ServerAddressList) getAndConnectServer(
force bool, svc config.ClusterService, timeout time.Duration) *Connection {
s.connectMutex.Lock()
defer s.connectMutex.Unlock()
address, instance, err := s.getServerAddress(s.manager.GetHashKey())
if err != nil {
log.GetNetworkLogger().Errorf("fail get server address from service %s, error %v", svc, err)
return nil
}
conn, err := s.connectServer(force, address, instance, svc, timeout)
if err != nil {
log.GetNetworkLogger().Errorf("fail get connect %s from service %s, error %v", address, svc, err)
return nil
}
return conn
}
// getServerAddress 与远程server进行连接
func (s *ServerAddressList) getServerAddress(hashKey []byte) (string, model.Instance, error) {
var targetAddress string
var instance model.Instance
if s.service.ClusterType == config.BuiltinCluster || s.service.ClusterType == config.ConfigCluster {
serverCount := len(s.addresses)
targetAddress = s.addresses[s.curIndex%serverCount]
if s.curIndex == math.MaxInt32 {
s.curIndex = 0
} else {
s.curIndex++
}
} else {
engineValue, ok := s.manager.valueCtx.GetValue(model.ContextKeyEngine)
if !ok {
return "", nil, fmt.Errorf("flow engine is not ready")
}
engine := engineValue.(model.Engine)
// 返回错误,使得外部流程可以使用埋点进行发现
if s.useDefault && atomic.LoadUint32(&s.manager.ready) < serviceReadyStatus {
return "", nil, fmt.Errorf("discover service %s is not ready", s.service)
}
req := &model.GetOneInstanceRequest{
FlowID: atomic.AddUint64(&flowID, 1),
Namespace: s.service.Namespace,
Service: s.service.Service,
// SourceService: &model.ServiceInfo{
// Metadata: map[string]string{"protocol": s.manager.protocol},
// },
Metadata: map[string]string{"protocol": s.manager.protocol},
HashKey: hashKey,
}
// 获取系统服务,不重试,超时时间设为300ms
req.SetRetryCount(0)
req.SetTimeout(getAddressTimeout)
resp, err := engine.SyncGetOneInstance(req)
if err != nil {
return "", nil, err
}
instance = resp.Instances[0]
targetAddress = fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort())
}
return targetAddress, instance, nil
}
// loadCurrentConnection 获取服务当前连接
func (s *ServerAddressList) loadCurrentConnection() *Connection {
connValue := s.curConn.Load()
if reflect2.IsNil(connValue) {
return nil
}
return connValue.(*Connection)
}
// connectServer 根据地址进行连接
func (s *ServerAddressList) connectServer(force bool, addr string, instance model.Instance,
service config.ClusterService, timeout time.Duration) (*Connection, error) {
var lastConn = s.loadCurrentConnection()
if !force && IsAvailableConnection(lastConn) && lastConn.Address == addr {
log.GetNetworkLogger().Debugf("address %s not changed, no need to switch server", addr)
// 服务地址没有发生变更,无需切换
return lastConn, nil
}
connectTime := time.Now()
tcpConn, err := s.manager.creator.CreateConnection(addr, timeout, &s.manager.ClientInfo)
connID := ConnID{
ID: uuid.New().ID(),
Service: service,
Address: addr,
instance: instance,
}
connectDuration := time.Since(connectTime)
if err != nil {
if !reflect2.IsNil(instance) {
s.manager.ReportFail(connID, int32(model.ErrCodeConnectError), connectDuration)
}
return nil, fmt.Errorf("fail to connect to %s, timeout is %v, service is %s, because %s",
addr, connectDuration, s.service, err.Error())
}
if nil != lastConn {
// 延迟释放连接
lastConn.lazyClose(false)
}
conn := &Connection{
Conn: tcpConn,
ConnID: connID,
}
if ctrl, ok := DefaultServerServiceToConnectionControl[s.service.ClusterType]; ok && ctrl == ConnectionLong {
log.GetNetworkLogger().Infof("long connection %v, target address %s: create", conn.ConnID, addr)
} else {
log.GetNetworkLogger().Debugf("short connection %v, target address %s: create", conn.ConnID, addr)
}
s.curConn.Store(conn)
return conn, nil
}
// ConnectServerByAddrOnly 。根据地址进行链接
func (s *ServerAddressList) ConnectServerByAddrOnly(addr string, timeout time.Duration,
clsService config.ClusterService, instance model.Instance) (*Connection, error) {
connectTime := time.Now()
tcpConn, err := s.manager.creator.CreateConnection(addr, timeout, &s.manager.ClientInfo)
connectDuration := time.Since(connectTime)
if err != nil {
return nil, fmt.Errorf("fail to connect to %s, timeout is %v, service is %s, because %s",
addr, connectDuration, s.service, err.Error())
}
connID := ConnID{
ID: uuid.New().ID(),
Service: clsService,
Address: addr,
instance: instance,
}
conn := &Connection{
Conn: tcpConn,
ConnID: connID,
}
conn.acquire(addr)
return conn, nil
}
// tryGetConnection 与远程server进行连接
func (s *ServerAddressList) tryGetConnection(timeout time.Duration, hashKey []byte) (*Connection, error) {
curConnValue := s.loadCurrentConnection()
if IsAvailableConnection(curConnValue) {
// log.GetBaseLogger().Debugf("[CheckConnection]traceCheck IsAvailableConnection")
return curConnValue, nil
}
s.connectMutex.Lock()
defer s.connectMutex.Unlock()
curConnValue = s.loadCurrentConnection()
if IsAvailableConnection(curConnValue) {
return curConnValue, nil
}
address, instance, err := s.getServerAddress(hashKey)
if err != nil {
return nil, err
}
return s.connectServer(false, address, instance, s.service, timeout)
}
// closeCurrentConnection 关闭当前连接
func (s *ServerAddressList) closeCurrentConnection(force bool) {
conn := s.loadCurrentConnection()
if IsAvailableConnection(conn) {
log.GetNetworkLogger().Debugf("current connection for %s has been closed", s.service)
conn.lazyClose(force)
}
}
// connectionManager 连接管理器实现
type connectionManager struct {
// 客户端信息
ClientInfo
// 连接超时时间
connectTimeout time.Duration
// 连接切换周期
switchInterval time.Duration
ctx context.Context
cancel context.CancelFunc
// 发现服务
discoverService model.ServiceKey
// 配置中心服务
configService model.ServiceKey
// 发现服务的事件集合,相同事件不去更新
discoverEventSet map[model.EventType]bool
// 并发更新锁
discoverEventMutex sync.Mutex
// 是否已经准备完成, 0代表未完成,1代表完成
ready uint32
// 系统服务信息
serverServices map[config.ClusterType]*ServerAddressList
// 全局上下文信息
valueCtx model.ValueContext
// 当前使用的协议
protocol string
// 连接创建器
creator ConnCreator
}
// NewConnectionManager 创建连接管理器
func NewConnectionManager(
cfg config.Configuration, valueCtx model.ValueContext) (ConnectionManager, error) {
addresses := cfg.GetGlobal().GetServerConnector().GetAddresses()
switchInterval := cfg.GetGlobal().GetServerConnector().GetServerSwitchInterval()
connectTimeout := cfg.GetGlobal().GetServerConnector().GetConnectTimeout()
protocol := cfg.GetGlobal().GetServerConnector().GetProtocol()
manager := &connectionManager{
connectTimeout: connectTimeout,
switchInterval: switchInterval,
serverServices: make(map[config.ClusterType]*ServerAddressList),
valueCtx: valueCtx,
protocol: protocol,
discoverEventSet: make(map[model.EventType]bool, 0),
}
serverServices := config.GetServerServices(cfg)
for _, svc := range serverServices {
svcList := &ServerAddressList{
service: svc,
useDefault: config.DefaultServerServiceToUseDefault[svc.ClusterType],
manager: manager,
}
if svc.ClusterType == config.DiscoverCluster {
manager.discoverService = svc.ServiceKey
}
manager.serverServices[svc.ClusterType] = svcList
}
builtInAddrList := &ServerAddressList{
service: config.ClusterService{
ServiceKey: model.ServiceKey{Namespace: config.ServerNamespace, Service: defaultService},
ClusterType: config.BuiltinCluster,
},
useDefault: false,
manager: manager,
addresses: addresses,
curIndex: rand.Intn(len(addresses)),
}
manager.serverServices[config.BuiltinCluster] = builtInAddrList
if len(manager.discoverService.Service) == 0 {
manager.discoverService = builtInAddrList.service.ServiceKey
manager.ready = serviceReadyStatus
}
manager.ctx, manager.cancel = context.WithCancel(context.Background())
go manager.doSwitchRoutine()
return manager, nil
}
// NewConfigConnectionManager 创建配置中心连接管理器
func NewConfigConnectionManager(cfg config.Configuration, valueCtx model.ValueContext) (ConnectionManager, error) {
configSwitchInterval := cfg.GetConfigFile().GetConfigConnectorConfig().GetServerSwitchInterval()
configConnectTimeout := cfg.GetConfigFile().GetConfigConnectorConfig().GetConnectTimeout()
configProtocol := cfg.GetConfigFile().GetConfigConnectorConfig().GetProtocol()
configManager := &connectionManager{
connectTimeout: configConnectTimeout,
switchInterval: configSwitchInterval,
serverServices: make(map[config.ClusterType]*ServerAddressList),
valueCtx: valueCtx,
protocol: configProtocol,
}
configAddresses := cfg.GetConfigFile().GetConfigConnectorConfig().GetAddresses()
configAddrList := &ServerAddressList{
service: config.ClusterService{
ServiceKey: model.ServiceKey{Namespace: config.ServerNamespace, Service: defaultService},
ClusterType: config.ConfigCluster,
},
useDefault: false,
manager: configManager,
addresses: configAddresses,
curIndex: rand.Intn(len(configAddresses)),
}
configManager.serverServices[config.ConfigCluster] = configAddrList
if len(configManager.configService.Service) == 0 {
configManager.configService = configAddrList.service.ServiceKey
configManager.ready = serviceReadyStatus
}
configManager.ctx, configManager.cancel = context.WithCancel(context.Background())
return configManager, nil
}
// SetConnCreator 设置当前协议的连接创建器
func (c *connectionManager) SetConnCreator(creator ConnCreator) {
c.creator = creator
}
// tryGetConnection 尝试获取连接
func (c *connectionManager) tryGetConnection(clusterType config.ClusterType, hashKey []byte) (*Connection, error) {
serverList, ok := c.serverServices[clusterType]
if !ok {
var useDefault, ok bool
if useDefault, ok = config.DefaultServerServiceToUseDefault[clusterType]; !ok {
return nil, fmt.Errorf("cluster %v is invalid", clusterType)
}
if !useDefault {
return nil, fmt.Errorf("service name for cluster %v is not config", clusterType)
}
serverList = c.serverServices[config.BuiltinCluster]
}
return serverList.tryGetConnection(c.connectTimeout, hashKey)
}
// GetConnection 获取并占用连接
func (c *connectionManager) GetConnection(opKey string, clusterType config.ClusterType) (*Connection, error) {
return c.GetConnectionByHashKey(opKey, clusterType, c.GetHashKey())
}
// GetConnectionByHashKey 获取并占用连接
func (c *connectionManager) GetConnectionByHashKey(
opKey string, clusterType config.ClusterType, hashKey []byte) (*Connection, error) {
for {
conn, err := c.tryGetConnection(clusterType, hashKey)
if err != nil {
log.GetNetworkLogger().Errorf(
"fail to get connection, opKey is %s, cluster %v, error is %s", opKey, clusterType, err)
return nil, err
}
if conn.acquire(opKey) {
return conn, nil
}
}
}
func (c *connectionManager) GetHashExpectedInstance(clusterType config.ClusterType,
hash []byte) (string, model.Instance, error) {
serverList, ok := c.serverServices[clusterType]
if !ok {
panic(fmt.Sprintf("connectionManager has no clusterType %s", clusterType))
}
addr, ins, err := serverList.getServerAddress(hash)
return addr, ins, err
}
func (c *connectionManager) ConnectByAddr(clusterType config.ClusterType, addr string,
instance model.Instance) (*Connection, error) {
serverList, ok := c.serverServices[clusterType]
if !ok {
panic(fmt.Sprintf("connectionManager has no clusterType %s", clusterType))
}
return serverList.ConnectServerByAddrOnly(addr, time.Millisecond*500, serverList.service, instance)
}
// ReportSuccess 上报服务成功
func (c *connectionManager) ReportSuccess(connID ConnID, retCode int32, timeout time.Duration) {
log.GetBaseLogger().Debugf("service %s: reported success", connID.Service)
var err error
if !reflect2.IsNil(connID.instance) {
engineValue, ok := c.valueCtx.GetValue(model.ContextKeyEngine)
if ok {
engine := engineValue.(model.Engine)
result := &model.ServiceCallResult{
CalledInstance: connID.instance,
RetStatus: model.RetSuccess}
result.SetDelay(timeout)
result.SetRetCode(retCode)
err = engine.SyncUpdateServiceCallResult(result)
}
}
if err != nil {
log.GetBaseLogger().Errorf(
"error to update success call result for connection %s, %s", connID.String(), err)
}
}
// ReportFail 上报服务失败
func (c *connectionManager) ReportFail(connID ConnID, retCode int32, timeout time.Duration) {
log.GetBaseLogger().Warnf("connection %s: reported fail", connID)
var err error
if !reflect2.IsNil(connID.instance) && connID.Service.ClusterType != config.BuiltinCluster {
engineValue, ok := c.valueCtx.GetValue(model.ContextKeyEngine)
if ok {
engine := engineValue.(model.Engine)
result := &model.ServiceCallResult{
CalledInstance: connID.instance,
RetStatus: model.RetFail}
result.SetDelay(timeout)
result.SetRetCode(retCode)
err = engine.SyncUpdateServiceCallResult(result)
}
}
if err != nil {
log.GetBaseLogger().Errorf(
"error to update fail call result for connection %s, %s", connID.String(), err)
}
}
// ReportConnectionDown 报告连接故障
func (c *connectionManager) ReportConnectionDown(connID ConnID) {
log.GetBaseLogger().Tracef("connection %s: reported down", connID)
var svc = connID.Service
var serverList *ServerAddressList
var ok bool
serverList, ok = c.serverServices[svc.ClusterType]
if !ok {
log.GetBaseLogger().Warnf("connection %s down received from unknown service %s", connID, svc)
return
}
log.GetBaseLogger().Infof("connection %s down received from service %s", connID, svc.String())
curConn := serverList.loadCurrentConnection()
if nil != curConn && connID.ID != curConn.ConnID.ID {
// 已经切换新连接,忽略
return
}
if nil != curConn && IsAvailableConnection(curConn) {
curConn.lazyClose(false)
}
}
// Destroy 销毁连接管理器
func (c *connectionManager) Destroy() {
c.cancel()
}
// doSwitchRoutine 执行切换流程,只有当初次连接成功后才启动
func (c *connectionManager) doSwitchRoutine() {
// 服务端定期切换
switchTicker := time.NewTicker(c.switchInterval)
buildInCloseTicker := time.NewTicker(config.DefaultBuiltInServerConnectionCloseTimeout)
defer func() {
buildInCloseTicker.Stop()
switchTicker.Stop()
// 退出后清理连接
for _, serverList := range c.serverServices {
// destroy的话,就要强制关闭了
serverList.closeCurrentConnection(true)
}
}()
for {
select {
case <-c.ctx.Done():
log.GetBaseLogger().Infof("doSwitchRoutine of connection manager has been terminated")
return
case <-buildInCloseTicker.C:
serverList := c.serverServices[config.BuiltinCluster]
serverList.closeCurrentConnection(false)
case <-switchTicker.C:
for clusterType, serverList := range c.serverServices {
if clusterType == config.BuiltinCluster {
continue
}
if ctrl, ok := DefaultServerServiceToConnectionControl[clusterType]; ok && ctrl == ConnectionLong {
// 只有长连接模式才切换server
curConn := serverList.loadCurrentConnection()
if IsAvailableConnection(curConn) {
// 只有成功后,才进行切换
log.GetNetworkLogger().Infof("start switch for %s", serverList.service.ServiceKey)
conn := serverList.getAndConnectServer(false, serverList.service, c.connectTimeout)
if nil != conn {
log.GetNetworkLogger().Infof("discover server switched to %s", conn.Address)
}
continue
}
log.GetNetworkLogger().Infof("skip switch for %s", serverList.service.ServiceKey)
}
}
}
}
}
// UpdateServers 更新系统服务
func (c *connectionManager) UpdateServers(svcEventKey model.ServiceEventKey) {
svc := model.ServiceKey{Namespace: svcEventKey.Namespace, Service: svcEventKey.Service}
if c.discoverService == svc {
if !c.isAvailableUpdate(svcEventKey.Type) {
return
}
value := atomic.AddUint32(&c.ready, 1)
log.GetBaseLogger().Infof("discover server updated to ready %v, event is %s", value, svcEventKey)
}
}
// isAvailableUpdate 是否有效的事件更新
func (c *connectionManager) isAvailableUpdate(event model.EventType) bool {
c.discoverEventMutex.Lock()
defer c.discoverEventMutex.Unlock()
if _, ok := c.discoverEventSet[event]; ok {
return false
}
c.discoverEventSet[event] = true
return true
}
// GetClientInfo 获取当前客户端信息
func (c *connectionManager) GetClientInfo() *ClientInfo {
return &c.ClientInfo
}
// IsReady discover是否已经就绪
func (c *connectionManager) IsReady() bool {
return atomic.LoadUint32(&c.ready) == serviceReadyStatus
}
1
https://gitee.com/meng_mengs_boys/polaris-go.git
git@gitee.com:meng_mengs_boys/polaris-go.git
meng_mengs_boys
polaris-go
polaris-go
v1.5.4

搜索帮助