4 Star 5 Fork 4

Plato / Service-Box-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
box.go 27.79 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144
package sbox
import (
"encoding/hex"
"errors"
"fmt"
"reflect"
"time"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/transport"
"gitee.com/dennis-kk/service-box-go/util/scheduler"
"gitee.com/dennis-kk/service-box-go/util/http_proxy"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
_ "gitee.com/dennis-kk/service-box-go/internal/jsonpb/jsonpbcpp"
"gitee.com/dennis-kk/service-box-go/internal/net"
_ "gitee.com/dennis-kk/service-box-go/internal/zookeeper"
"gitee.com/dennis-kk/service-box-go/util/config"
berrors "gitee.com/dennis-kk/service-box-go/util/errors"
"gitee.com/dennis-kk/service-box-go/util/mongodb"
"gitee.com/dennis-kk/service-box-go/util/redis"
"gitee.com/dennis-kk/service-box-go/util/scli"
"gitee.com/dennis-kk/service-box-go/util/slog"
"gitee.com/dennis-kk/service-box-go/util/slog/zap"
"gitee.com/dennis-kk/service-box-go/util/uuid"
"gitee.com/dennis-kk/service-box-go/util/uuid/custom"
)
type (
SBoxKey struct{}
packCache map[uint64]*idlrpc.PackageInfo
ServiceBox struct {
boxState state // service box state
network *BoxNetwork // network
srvLayer *serviceLayer // service host manager
opts *Options // options
directHandle *BoxChannel // direct connect channel
cmd scli.Cmd // command line parser
rpc idlrpc.IRpc // rpc framework
logger slog.BoxLogger // logger
cfg config.Config // Box's Config Module
serviceCfg *ServiceCfg // service custom config
rpcCfg *RpcConfig // service rpc config
waitLoadPack packCache // package wait to load
proxy *ProxyHandler // proxy handle
msgHandler *ServiceMessageHandle // rpc message handler
idGen uuid.IUuidGenerator // uuid framework
redisProxy redis.IClient // redis client
mongoClient *mongodb.MongoDB // mongo db client
scheduler scheduler.ITimer // scheduler
identityID string // identity id
componentMgr *ComponentsManager // extra component manager
handlers [][]TransHandlerFunc // handlers cache
}
)
func MakeServiceBox() *ServiceBox {
return &ServiceBox{
boxState: stateClosed,
waitLoadPack: make(packCache), //package cache
//network: NewBoxNetWork(), //init with config
opts: NewBoxOptions(), //init options
rpcCfg: &RpcConfig{},
serviceCfg: &ServiceCfg{
CustomName: make(map[uint64]string),
},
msgHandler: &ServiceMessageHandle{},
rpc: nil,
componentMgr: makeComponentsManager(),
handlers: make([][]TransHandlerFunc, TransHandlerMax),
}
}
func (sb *ServiceBox) Init(opts ...Option) error {
for _, opt := range opts {
opt(sb.opts)
}
// init cmd module
if err := sb.initCmd(); err != nil {
return err
}
// init msgHanlder
if err := sb.initMsgHandler(); err != nil {
return err
}
// init config
if err := sb.initCfg(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init config with file error %v \n", err))
}
// init logger
if err := sb.initLogger(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init box's logger error %v \n", err))
}
sb.initVersionInfo()
//sb.network.Init()
if err := sb.initNetwork(); err != nil {
panic(fmt.Sprintf("[ServiceBox] The box's network module init error: %s", err.Error()))
}
if err := sb.initScheduler(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init scheduler module error %v \n", err))
}
if err := sb.initRpc(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init rpc framework error %v \n", err))
}
if err := sb.initServiceFinder(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init service infra error %v \n", err))
}
if err := sb.initRedisClient(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init redis client error %v \n", err))
}
if err := sb.initMongoDB(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init mongo client error %v \n", err))
}
if err := sb.initUuidGen(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init uuid error %v \n", err))
}
if err := sb.initProxyHandle(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init proxy handler error %v \n", err))
}
if err := sb.initHttpProxy(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init http proxy handler error %q \n", err.Error()))
}
if err := sb.componentMgr.init(sb); err != nil {
sb.logger.Error("[ServiceBox] init extra component error %s", err.Error())
}
sb.setState(stateInited)
sb.logger.Info("[Service Box] Box init finished !")
return nil
}
func (sb *ServiceBox) Start() error {
// network start
if err := sb.startNetwork(); err != nil {
sb.logger.Error("[ServiceBox] The network module failed to start. The error cause is %v", err)
return err
}
sb.logger.Info("[ServiceBox] The network module has been started successfully !")
if err := sb.srvLayer.start(); err != nil {
sb.logger.Warn("[ServiceBox] start service finder error %v !", err)
return err
}
sb.logger.Info("[ServiceBox] The service finder module has been started successfully !")
if err := sb.startScheduler(); err != nil {
sb.logger.Warn("[ServiceBox] The Scheduler manager failed to start. The error cause is !", err)
return err
}
//rpc framework
if err := sb.rpc.Start(); err != nil {
sb.logger.Warn("[ServiceBox] The RPC module failed to start. The error cause is %v !", err)
return err
}
sb.logger.Info("[ServiceBox] The RPC framework has been successfully running.!")
if err := sb.startRedisClient(); err != nil {
sb.logger.Warn("[ServiceBox] The REDIS module failed to start. The error cause is %v !", err)
return err
}
if err := sb.startMongoClient(); err != nil {
sb.logger.Warn("[ServiceBox] The MongoDB module failed to start. The error cause is %v !", err)
return err
}
//proxy handler
if err := sb.startProxyHandler(); err != nil {
sb.logger.Warn("[ServiceBox] The RPC proxy module failed to start. The error cause is %v !", err)
return err
}
//http handler
if err := sb.startHttpHandle(); err != nil {
sb.logger.Warn("[ServiceBox] The HTTP proxy module failed to start. The error cause is %s", err.Error())
return err
}
//direct connect module
if err := sb.startConnHandle(); err != nil {
sb.logger.Warn("[ServiceBox] Failed to start direct connection due to %v !", err)
return err
}
//register service to remote
if !sb.opts.proxyMode {
err := sb.loadAllService()
if err != nil {
return err
}
}
if err := sb.componentMgr.start(sb); err != nil {
sb.logger.Warn("[ServiceBox] start extra component error %s", err.Error())
}
//request necessary service from remote
sb.setState(stateRunning)
sb.logger.Info(" ============ Service Box Go Running ============ ")
return nil
}
func (sb *ServiceBox) Tick() {
sb.network.Tick()
sb.srvLayer.tick()
if sb.proxy != nil {
sb.proxy.onTick()
}
if err := sb.rpc.Tick(); err != nil {
sb.logger.Warn("[ServiceBox] The RPC backend has encountered an exception during execution: %v !", err)
}
if sb.componentMgr != nil {
sb.componentMgr.tick()
}
}
func (sb *ServiceBox) ShutDown() error {
// stop all components.
if sb.componentMgr != nil {
if err := sb.componentMgr.stop(sb); err != nil {
sb.logger.Warn("[ServiceBox] Component downtime error occurred %s ", err.Error())
}
}
// stop scheduler
if sb.scheduler != nil {
sb.logger.Info("The shutdown of the scheduling manager is about to take place with %d tasks remaining", sb.scheduler.Count())
_ = sb.scheduler.Shutdown()
}
// stop proxy handle
if sb.proxy != nil {
sb.proxy.shutdownProxyHandle()
sb.proxy.shutdownHttpHandle()
}
// stop rpc framework
if err := sb.rpc.ShutDown(); err != nil {
sb.logger.Warn("[ServiceBox] The RPC framework shutdown operation failed. The error cause is %v ", err.Error())
}
// clear message handler cache memory
sb.msgHandler.UnInit()
// stop service layer
sb.srvLayer.stop()
// stop network framework
sb.network.Stop()
// destroy components
if sb.componentMgr != nil {
_ = sb.componentMgr.destroy(sb)
}
if sb.cfg != nil {
_ = sb.cfg.Close()
}
sb.logger.Info("========================== Service Box Stopped ==========================")
return nil
}
func (sb *ServiceBox) UseTransHandler(handlerType int, handler TransHandlerFunc) error {
if handlerType >= TransHandlerMax {
return fmt.Errorf("invalid handler type %d", handlerType)
}
sb.handlers[handlerType] = append(sb.handlers[handlerType], handler)
return nil
}
func (sb *ServiceBox) GetLogger() slog.BoxLogger {
return sb.logger
}
func (sb *ServiceBox) GetMessageHandle() *ServiceMessageHandle {
return sb.msgHandler
}
func (sb *ServiceBox) GetProxy(uuid uint64) (idlrpc.IProxy, error) {
name := fmt.Sprintf("%d", uuid)
return sb.GetProxyWithNickName(uuid, name)
}
func (sb *ServiceBox) GetTransport(uuid uint64) (*BoxChannel, error) {
name := fmt.Sprintf("%d", uuid)
trans := sb.srvLayer.tryGetTransport(name)
if trans != nil {
return trans, nil
}
type Result struct {
trans *BoxChannel
err error
}
notify := make(chan *Result, 1)
cb := func(name string, trans *BoxChannel, err error) {
defer func() {
close(notify)
if r := recover(); r != nil {
sb.logger.Warn("[ServiceBox] An exception was encountered while obtaining the proxy %d ", uuid)
return
}
}()
result := &Result{
trans: trans,
err: err,
}
notify <- result
}
err := sb.srvLayer.getTransport(name, cb)
if err != nil {
return nil, err
}
timer := time.NewTimer(time.Second * 5)
defer func() {
timer.Stop()
}()
for {
select {
case result := <-notify:
return result.trans, result.err
case <-timer.C:
sb.logger.Warn("[ServiceBox] get proxy %d timeout ", uuid)
return nil, berrors.FindServiceTimeOut
}
}
}
func (sb *ServiceBox) GetProxyWithNickName(uuid uint64, name string) (idlrpc.IProxy, error) {
//直连模式
if sb.directHandle != nil {
return sb.rpc.GetServiceProxy(uuid, sb.directHandle)
}
trans := sb.srvLayer.tryGetTransport(name)
if trans != nil {
return sb.rpc.GetServiceProxy(uuid, trans)
}
type Result struct {
proxy idlrpc.IProxy
err error
}
notify := make(chan *Result, 1)
cb := func(name string, trans *BoxChannel, err error) {
defer func() {
close(notify)
if r := recover(); r != nil {
sb.logger.Warn("[ServiceBox] An exception %v was encountered while obtaining the proxy %d", r, uuid)
return
}
}()
result := &Result{
err: err,
}
if err != nil {
notify <- result
return
}
result.proxy, result.err = sb.rpc.GetServiceProxy(uuid, trans)
notify <- result
}
timer := time.NewTimer(time.Second * 5)
defer func() {
timer.Stop()
}()
err := sb.srvLayer.getTransport(name, cb)
if err != nil {
return nil, err
}
for {
select {
case result := <-notify:
return result.proxy, result.err
case <-timer.C:
sb.logger.Warn("[ServiceBox] get proxy %d timeout ", uuid)
return nil, berrors.FindServiceTimeOut
}
}
}
func (sb *ServiceBox) GetScheduler() scheduler.ITimer {
return sb.scheduler
}
func (sb *ServiceBox) GetProxyFromPeer(ctx idlrpc.IServiceContext, uuid uint64) (idlrpc.IProxy, error) {
return sb.rpc.GetProxyFromPeer(ctx, uuid)
}
func (sb *ServiceBox) GetExtraProxy(uuid uint64, globalIndex protocol.GlobalIndexType, trans transport.ITransport) (idlrpc.IProxy, error) {
return sb.rpc.GetExtraProxy(uuid, globalIndex, trans)
}
func (sb *ServiceBox) connectToService(name, network, host string) {
if sb == nil || sb.network == nil {
//TODO add log error
return
}
//先尝试从缓存获取
ch := sb.network.TryGetChannel(host)
if ch != nil {
sb.onConnect(host, ch, nil)
} else {
sb.network.ConnectToService(name, network, host)
}
}
func (sb *ServiceBox) AddServicePackage(pack *idlrpc.PackageInfo) error {
if sb == nil {
panic("Service Box not init yet!")
}
if pack == nil {
//TODO add error
return nil
}
if pack.Creator == nil {
//TODO add errors and log
sb.logger.Warn("%d invalid service package ", pack.ServiceUUID)
return nil
}
sb.waitLoadPack[pack.ServiceUUID] = pack
return nil
}
func (sb *ServiceBox) GetConfig() config.Config {
return sb.cfg
}
func (sb *ServiceBox) GetRedisClient() redis.IClient {
return sb.redisProxy
}
func (sb *ServiceBox) GetMongoClient() *mongodb.MongoDB {
return sb.mongoClient
}
func (sb *ServiceBox) GetOptions() string {
return sb.opts.option
}
func (sb *ServiceBox) AddComponent(component IComponent) error {
if sb.componentMgr != nil {
return sb.componentMgr.RegisterComponent(component)
}
return nil
}
// GetHttp 返回http模块,需要配置中开启http模块,并且以网关模式启动plato应用
func (sb *ServiceBox) GetHttp() *http_proxy.HttpProxy {
if sb.proxy != nil && sb.proxy.httpHandle != nil {
return sb.proxy.httpHandle.HttpProxy
}
return nil
}
// GetProxyHandler 返回proxy模块,需要配置中开启proxy模块,并且以网关模式启动plato应用
func (sb *ServiceBox) GetProxyHandler() *ProxyHandler {
return sb.proxy
}
// GetDirectHande 返回直连模式下的handler
func (sb *ServiceBox) GetDirectHande() transport.ITransport {
return sb.directHandle
}
func (sb *ServiceBox) WatchService(uuid uint64, watcher ServiceWatcher) error {
return sb.srvLayer.addWatcher(uuid, fmt.Sprintf("%d", uuid), watcher)
}
func (sb *ServiceBox) WatchServiceWithNickName(uuid uint64, name string, watcher ServiceWatcher) error {
return sb.srvLayer.addWatcher(uuid, name, watcher)
}
func (sb *ServiceBox) UnWatchService(name string, watcher ServiceWatcher) error {
return sb.srvLayer.removeWatcher(name, watcher)
}
// KickOutUser 踢用户下线
func (sb *ServiceBox) KickOutUser(proxy idlrpc.IProxy) error {
// 检查proxy 状态
if !proxy.IsConnected() {
return fmt.Errorf("The proxy has been closed ")
}
// 是否是外部连接
if proxy.GetGlobalIndex() == idlrpc.InvalidGlobalIndex {
return fmt.Errorf("Do not go offline for internal service connections ")
}
// 构造踢人协议
proto := &protocol.RpcLoggedOutPackage{
Header: &protocol.RpcLoggedOutHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: uint32(protocol.LoggedOutHeaderSize),
Type: protocol.RpcLoggedOut,
},
GlobalIndexId: proxy.GetGlobalIndex(),
},
}
// 打包 && 发送
buffer, _ := protocol.PackLoggedOutMsg(proto)
return proxy.GetTransport().Send(buffer)
}
func (sb *ServiceBox) setState(s state) {
sb.boxState = s
}
// startNetwork start box network
func (sb *ServiceBox) startNetwork() error {
// start network
sb.network.Start()
// proxy 模式跳过初始化监听
if sb.opts.proxyMode == false {
err := sb.cfg.Get("listener").Scan(&sb.opts.host)
if err != nil {
return err
}
// 预处理ip 支持配置hostname
err = sb.opts.PreHosts()
if err != nil {
return err
}
if len(sb.opts.host) != 0 {
for _, host := range sb.opts.host {
err := sb.network.ListenAt("tcp", host.InnerHost)
if err != nil {
return err
}
}
}
}
sb.registerNetworkCallback()
return nil
}
func (sb *ServiceBox) startProxyHandler() error {
if !sb.opts.proxyMode {
return nil
}
if err := sb.proxy.startProxyHandle(); err != nil {
return err
}
if len(sb.proxy.cfg.Address) == 0 {
sb.logger.Info("[ServiceBox] skip rpc-proxy initialization !")
return nil
}
if err := sb.network.ListenAt("tcp", sb.proxy.cfg.Address); err != nil {
return err
}
sb.logger.Info("[ServiceBox] start proxy handle successful !")
return nil
}
func (sb *ServiceBox) startHttpHandle() error {
if !sb.opts.proxyMode {
return nil
}
return sb.proxy.startHttpHandle()
}
func (sb *ServiceBox) registerNetworkCallback() {
sb.network.RegisterEventHandle(net.EventAccept, sb.onAccept)
sb.network.RegisterEventHandle(net.EventRespConnect, sb.onConnect)
sb.network.RegisterEventHandle(net.EventReceive, sb.onReceive)
sb.network.RegisterEventHandle(net.EventClose, sb.onClosed)
}
// initCmd will parse cmd from args
func (sb *ServiceBox) initCmd() error {
//add flag's
var opts []scli.Option
flags := []*scli.FlagCfg{
{
BaseCfg: &scli.BaseCfg{
Name: "config",
Short: "c",
Usage: "config path, default is default.yaml",
},
Dest: &sb.opts.cfgPath,
},
{
BaseCfg: &scli.BaseCfg{
Name: "proxy",
Short: "p",
Usage: "start service with proxy model",
},
Dest: &sb.opts.proxyMode,
},
{
BaseCfg: &scli.BaseCfg{
Name: "options",
Short: "option",
Usage: "custom option",
},
Dest: &sb.opts.option,
},
{
BaseCfg: &scli.BaseCfg{
Name: "apollo",
Usage: "init config system with apollo config [url appid namespace cluster]",
},
Dest: sb.opts.apolloCfg,
},
}
for _, flag := range flags {
opts = append(opts, scli.WithFlag(flag))
}
sb.cmd = scli.NewCmd(opts...)
return sb.cmd.Parse(sb.opts.args)
}
func (sb *ServiceBox) initMsgHandler() error {
sb.msgHandler.Init()
return nil
}
func (sb *ServiceBox) initCfg() (err error) {
if sb.opts.cfgPath == "" {
sb.opts.cfgPath = "default.yaml"
}
//启动apollo 模式,如果配置apollo 从apollo 启动,不在读取配置
if len(sb.opts.apolloCfg.Host) > 0 && len(sb.opts.apolloCfg.AppId) > 0 {
sb.cfg, err = config.LoadApollo(sb.opts.apolloCfg)
//sb.cfg.Watch()
_ = sb.cfg.Start()
} else {
sb.cfg, err = config.LoadFile(sb.opts.cfgPath)
}
if err != nil {
return
}
rpcCfg := sb.cfg.Get("rpc")
if !rpcCfg.IsNil() {
err = rpcCfg.Scan(sb.rpcCfg)
if err != nil {
return err
}
}
customCfg := sb.cfg.Get("services")
if !customCfg.IsNil() {
err = customCfg.Scan(sb.serviceCfg)
if err != nil {
return err
}
}
logoCfg := sb.cfg.Get("logo")
if !logoCfg.IsNil() {
sb.opts.logo = logoCfg.String(boxLogo)
}
// 网关模式强制设置位rpc-proxy
if sb.opts.proxyMode {
sb.opts.identityName = "rpc-proxy"
} else {
sb.opts.identityName = sb.cfg.Get("identity-name").String("service-box")
}
return
}
// initLogger will create logger with config
func (sb *ServiceBox) initLogger() error {
var err error
opts := []slog.Option{func(opt *slog.Options) {
err := sb.cfg.Get("logger").Scan(opt)
if err != nil {
panic(err)
}
},
}
boxName := sb.cfg.Get("name")
if !boxName.IsNil() {
opts = append(opts, slog.WithAppName(boxName.String("service-box")))
}
sb.logger, err = zap.NewLogger(opts...)
if err != nil {
return err
}
slog.SetDefaultLog(sb.logger)
return nil
}
func (sb *ServiceBox) initNetwork() error {
options := NetworkOptions{}
if err := sb.cfg.Get("network").Scan(&options); err != nil {
return err
}
if options.RecvBufferLen <= 0 {
options.RecvBufferLen = ReceiveBufferLen
}
sb.network = NewBoxNetWork(&options)
sb.network.Init()
sb.logger.Info("The network is initialized with %d bytes of send length", options.RecvBufferLen)
return nil
}
func (sb *ServiceBox) initProxyHandle() error {
if !sb.opts.proxyMode {
return nil
}
cfg := &proxyModeCfg{
Binding: true, //设置默认为绑定模式
}
if err := sb.cfg.Get("proxy").Scan(cfg); err != nil {
return err
}
// 配置错误
if len(cfg.Address) <= 0 {
sb.logger.Warn("[ServiceBox] proxy model config error, key host is nil !")
return berrors.ProxyConfigError
}
if err := cfg.preProcess(); err != nil {
return err
}
sb.proxy = newProxyHandler(cfg, sb)
if err := sb.proxy.initProxyHandle(); err != nil {
return err
}
sb.logger.Info("init proxy module successful at %s", cfg.Address)
return nil
}
func (sb *ServiceBox) initHttpProxy() error {
if !sb.opts.proxyMode {
return nil
}
// 检查是否配置了固定的配置字段
cfg := sb.cfg.Get("http_proxy")
if cfg.IsNil() {
sb.logger.Info("skip http_proxy module")
return nil
}
// 初始化http模块
return sb.proxy.initHttpHandle(cfg)
}
// initRedisClient 初始化redis模块的
func (sb *ServiceBox) initRedisClient() error {
// 获取配置中中redis字段
cfg := sb.cfg.Get("redis")
if cfg.IsNil() {
sb.logger.Info("skip redis module")
return nil
}
// 根据 host 数量配置选择集群或者普通客户端口
h := cfg.Get("addr_hosts")
if h.IsNil() {
sb.logger.Error("invalid redis address")
return berrors.RedisConfigError
}
opt := func(o *redis.Options) {
err := cfg.Scan(o)
if err != nil {
panic(err)
}
}
if len(h.StringSlice(nil)) > 1 {
sb.redisProxy = redis.NewClusterClient(opt)
} else {
sb.redisProxy = redis.NewClient(opt)
}
// 初始化redis模块
return sb.redisProxy.Init()
}
func (sb *ServiceBox) initMongoDB() error {
// 未配置mongo则跳过
cfg := sb.cfg.Get("mongo")
if cfg.IsNil() {
sb.logger.Info("skip mongo db module")
return nil
}
// 解析配置,初始化客户端
opt := func(o *mongodb.Options) {
err := cfg.Scan(o)
if err != nil {
panic(err)
}
}
sb.mongoClient = mongodb.NewMongoDB(opt)
return sb.mongoClient.Init()
}
// 初始化定时调度器模块
func (sb *ServiceBox) initScheduler() error {
// 未配置定时器模块则跳过
cfg := sb.cfg.Get("scheduler")
if cfg.IsNil() {
sb.logger.Info("skip scheduler module")
return nil
}
// 生成解析函数
op := func(options *scheduler.Options) {
if err := cfg.Scan(options); err != nil {
panic(err)
}
// 设置默认值
if options.MaxConcurrentJobs == 0 {
options.MaxConcurrentJobs = 1024
}
}
var err error
sb.scheduler, err = scheduler.NewScheduler()
if err != nil {
return err
}
return sb.scheduler.Init(op)
}
func (sb *ServiceBox) startConnHandle() error {
host := sb.cfg.Get("connect", "host").String("")
if len(host) == 0 {
return nil
}
// 设置了对应的connect, 尝试直连
conn, err := sb.network.ConnectTo("tcp", host)
if err != nil {
sb.logger.Warn("")
return err
}
sb.directHandle, err = sb.network.createBoxChannel(conn.RemoteAddr().String(), conn)
if err != nil {
sb.logger.Warn("create box channel error %v !", err)
return err
}
return nil
}
func (sb *ServiceBox) startRedisClient() error {
if sb.redisProxy == nil {
//skip redis module
return nil
}
return sb.redisProxy.Start()
}
func (sb *ServiceBox) startScheduler() error {
// 没有Scheduler 模块跳过即可
if sb.scheduler == nil {
return nil
}
if err := sb.scheduler.Start(); err != nil {
return err
} else {
sb.logger.Info("start scheduler module successfully !")
return nil
}
}
func (sb *ServiceBox) startMongoClient() error {
if sb.mongoClient == nil {
return nil
}
return sb.mongoClient.Start()
}
func (sb *ServiceBox) initUuidGen() error {
opt := func(opts *custom.Options) {
if err := sb.cfg.Get("uuid", "custom").Scan(opts); err != nil {
panic(err)
}
}
sb.idGen = custom.NewCustomUUID(opt)
sb.logger.Info("init uuid generator")
return nil
}
func (sb *ServiceBox) initServiceFinder() error {
//load from config
cfg := &serviceLayerConfig{}
err := sb.cfg.Get("service_finder").Scan(cfg)
if err != nil {
return err
}
if len(cfg.MiddlewareType) == 0 {
return errors.New("invalid service finder type")
}
// 对配置进行一些预处理
if cfg.Prefix == "" {
cfg.Prefix = "/"
}
sb.srvLayer = makeServiceLayer(sb.connectToService, sb.logger)
if sb.srvLayer == nil {
return errors.New("[ServiceBox] create service layer failed ")
}
return sb.srvLayer.init(cfg, sb.rpc.GetServiceProxy)
}
func (sb *ServiceBox) initRpc() error {
sb.rpc = idlrpc.CreateRpcFramework()
if sb.rpc == nil {
return errors.New("[ServiceBox] create rpc framework failed")
}
// 生成上下文
ctx := NewServiceBoxContext(sb)
// 获取配置
var opts []idlrpc.Option
opts = append(opts, idlrpc.WithServiceContext(ctx))
//TODO 根据配置开关rpc logger
opts = append(opts, idlrpc.WithLogger(sb.logger), idlrpc.WithStackTrace(sb.rpcCfg.StackTrace), idlrpc.WithCallTrace(sb.rpcCfg.CallTrace), idlrpc.WithUuidFactory(sb), idlrpc.WithMessageHandler(sb.msgHandler))
err := sb.rpc.Init(opts...)
if err != nil {
return err
}
return nil
}
// initVersionInfo 初始化box版本信息,身份信息
func (sb *ServiceBox) initVersionInfo() {
idGen := uuid.NewRFC4122UUID()
if idGen == nil {
panic("Description Failed to initialize the Service-box version.")
}
data := idGen.New()
sb.identityID = hex.EncodeToString(data[:])
sb.logger.Info("%s\n service-box version: v0.5.0\n service-box identify uuid: %s\n", sb.opts.logo, sb.identityID)
}
// =========================== service package tool function ===========================
// LoadServiceByPackInfo load service and register to rpc framework
func (sb *ServiceBox) loadServiceByPackInfo(pack *idlrpc.PackageInfo) error {
if sb == nil || sb.rpc == nil {
panic("service box's rpc framework not init yet !")
}
var sdk idlrpc.ISDK
var err error
if v, ok := sb.serviceCfg.CustomName[pack.ServiceUUID]; ok {
sdk, err = pack.Creator(v)
} else {
sdk, err = pack.Creator()
}
if err != nil {
return err
}
if sdk == nil {
return nil
}
if err = sdk.Register(sb.rpc); err != nil {
sb.logger.Warn("[ServiceBox] register package %s error ! %v", pack.ServiceUUID, err)
return err
}
host := sb.opts.GetOneHostIp()
typeName := reflect.TypeOf(sdk).Elem().Name()
sb.srvLayer.addLoadedServiceName(sdk.GetUuid(), typeName)
if !sdk.IsProxy() {
uuidStr := fmt.Sprintf("%d", sdk.GetUuid())
if uuidStr != sdk.GetNickName() {
if err := sb.srvLayer.registerService(sdk.GetNickName(), host); err != nil {
return err
}
sb.logger.Info("register service %s:%d to zookeeper with nickname %s ", typeName, sdk.GetUuid(), sdk.GetNickName())
}
sb.logger.Info("register service %s:%d to zookeeper with uuid %s ", typeName, sdk.GetUuid(), uuidStr)
return sb.srvLayer.registerService(uuidStr, host)
} else {
sb.logger.Info("load proxy %s:%d to service box", typeName, sdk.GetUuid())
return nil
}
}
func (sb *ServiceBox) onAccept(name string, trans *BoxChannel, err error) {
if err != nil {
return
}
if sb.proxy != nil {
sb.proxy.onAccept(name, trans)
}
}
// onConnect will add trans to service layer with host name
// If you get encounters any errors, it will return
func (sb *ServiceBox) onConnect(host string, trans *BoxChannel, err error) {
if err != nil || trans == nil {
_ = sb.srvLayer.onConnectFailed(host, err)
slog.Warn("[ServiceBox] connect to %s host error %v ", host, err)
return
}
// 通知对端身份信息
identityMsg := protocol.BuildIdentityNotifyMessage(sb.opts.identityName, sb.identityID)
if identityMsg == nil {
sb.logger.Warn("Notify the other party that their identity information is incorrect. %s:%s ", sb.opts.identityName, sb.identityID)
} else {
pkg, _ := protocol.PackIdentityNotifyMsg(identityMsg)
if err := trans.Send(pkg); err != nil {
sb.logger.Warn("Send the other party that their identity information is incorrect. %s:%s ", sb.opts.identityName, sb.identityID)
}
}
err = sb.srvLayer.onConnect(host, trans)
if err != nil {
return
}
// 尝试处理 扩展handler
handlers := sb.handlers[TransHandlerOnConnect]
for _, hand := range handlers {
if err := hand(trans); err != nil {
sb.logger.Warn("Error: executing handler function %s", err.Error())
}
}
}
func (sb *ServiceBox) onReceive(tSid string, trans *BoxChannel, err error) {
if err != nil {
sb.logger.Error("[ServiceBox] %s %v while receive message", tSid, err)
return
}
if sb.proxy != nil {
err := sb.rpc.OnProxyMessage(trans, sb.proxy)
if err != nil {
return
}
} else {
// 非网关的情景才需要
ctx := NewServiceBoxContext(sb)
ctx.setBoxChannel(trans)
err := sb.rpc.OnMessage(ctx)
if err != nil {
return
}
}
}
func (sb *ServiceBox) onClosed(name string, trans *BoxChannel, _ error) {
if trans == nil {
return
}
// 非外部连接, 通知服务管理层更新服务连接
if trans.GlobalIndex() == idlrpc.InvalidGlobalIndex {
sb.srvLayer.onClose(trans.peerHost)
}
if sb.proxy != nil {
sb.proxy.onClose(name, trans)
}
// 更新连接身份信息
if sb.msgHandler != nil {
sb.msgHandler.onClose(trans)
}
// 处理 onClose 扩展接口
handlers := sb.handlers[TransHandlerOnClose]
for _, hand := range handlers {
if err := hand(trans); err != nil {
sb.logger.Error("Error: executing handler function %s", err.Error())
}
}
}
func (sb *ServiceBox) NewUuid() string {
data := sb.idGen.New()
return hex.EncodeToString(data[:])
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/dennis-kk/service-box-go.git
git@gitee.com:dennis-kk/service-box-go.git
dennis-kk
service-box-go
Service-Box-go
v0.5.17

搜索帮助

344bd9b3 5694891 D2dac590 5694891