4 Star 5 Fork 4

Plato/Service-Box-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
descriptor.go 9.01 KB
一键复制 编辑 原始数据 按行查看 历史
package descriptor
import (
"errors"
"path/filepath"
"strconv"
"sync"
"gitee.com/dennis-kk/service-box-go/util/slog"
"gitee.com/dennis-kk/service-box-go/util/tools"
"github.com/fsnotify/fsnotify"
)
var (
InvalidIdlPath = errors.New("invalid idl file path")
InvalidIteratorFunc = errors.New("no effective iterator function ")
)
const (
ServiceAdded = iota + 1
ServiceModify
ServiceRemove
)
type (
//MethodDescriptor 方法描述结构
MethodDescriptor struct {
Index uint32 `json:"index"` //方法序号
Name string `json:"name"` //方法名
Oneway bool `json:"oneway"` //是否oneway
Public bool `json:"public"` // 是否public 方法 与 protected 互斥
Protected bool `json:"protected"` // 是否protect 方法, 与 public 互斥
TimeOut uint32 `json:"timeout"` // 超时时间
}
//ServiceDescriptor 服务描述结构
ServiceDescriptor struct {
Uuid string `json:"uuid"` // 服务UUID
Name string `json:"name"` // 服务名
Methods []*MethodDescriptor `json:"methods"` // 服务方法名
}
//FileDescriptor 文件描述结构体
FileDescriptor struct {
BaseName string `json:"-"` //配置文件名
IdlName string `json:"idlname"` //idl 文件名
Services []*ServiceDescriptor `json:"services"` //idl 文件中包含的服务
}
//IteratorFunc 迭代方法,参数为服务数字uid ,服务描述,是否有错误; 如果希望终端迭代,返回非空错误
IteratorFunc func(uint64, *ServiceDescriptor, error) error
//WatcherFunc 监听器,用于监听文件变化
WatcherFunc func(int, []*ServiceDescriptor) error
DataBase struct {
cfg *Options //外部可调整设置参数
services map[uint64]*ServiceDescriptor //已加载的服务记录
logger slog.BoxLogger //日志
watcher *fsnotify.Watcher //文件监控,用于监听文件夹里面的东西变化
handle WatcherFunc //文件变化引起变化导致的服务变化通知函数
rw sync.RWMutex //读写锁
wg sync.WaitGroup
}
)
func MakeDataBase() *DataBase {
return &DataBase{
cfg: &Options{
Suffix: ".json",
},
services: make(map[uint64]*ServiceDescriptor),
rw: sync.RWMutex{},
wg: sync.WaitGroup{},
}
}
func (db *DataBase) Init(opts ...Option) error {
for _, opt := range opts {
opt(db.cfg)
}
db.logger = db.cfg.logger
// 检查日志模块
if db.logger == nil {
//从静态空间分配
db.logger = slog.Children(slog.WithAppName("descriptor"))
}
// 检查配置路径
if len(db.cfg.DescriptionPath) <= 0 {
db.logger.Error("no effective idl config file path !")
return InvalidIdlPath
}
// 路径存在判断
if !tools.IsEffectiveDir(db.cfg.DescriptionPath) {
db.logger.Error(" %q path is not exits or not directory ", db.cfg.DescriptionPath)
return InvalidIdlPath
}
// 递归扫描目录下所有json文件
fds, err := db.loadServiceFromDir(db.cfg.DescriptionPath, true)
if err != nil {
db.logger.Error("load services from dir %q error %q", db.cfg.DescriptionPath, err.Error())
}
// 缓存已经加载的服务
for _, fd := range fds {
for _, service := range fd.Services {
uuid, err := strconv.ParseInt(service.Uuid, 10, 64)
if err != nil {
db.logger.Warn("convert service's uuid %s to int failed ", service.Uuid)
}
db.services[uint64(uuid)] = service
}
}
return err
}
func (db *DataBase) Start() (err error) {
// 初始化 文件监控, 文件变化自动注册和热更代码
db.watcher, err = fsnotify.NewWatcher()
if err != nil {
db.logger.Error("init file watcher error %v", err)
return err
}
// 添加监控路径
err = db.watcher.Add(db.cfg.DescriptionPath)
if err != nil {
db.logger.Error("add path %q to watcher error %v", db.cfg.DescriptionPath, err)
return err
}
db.wg.Add(1)
go db.tick()
return nil
}
func (db *DataBase) RangeServices(iterator IteratorFunc) error {
if iterator == nil {
return InvalidIteratorFunc
}
for key, value := range db.services {
err := iterator(key, value, nil)
if err != nil {
return err
}
}
return nil
}
func (db *DataBase) GetServiceType(uuid uint64) *ServiceDescriptor {
db.rw.RLock()
defer db.rw.RUnlock()
if value, ok := db.services[uuid]; ok {
return value
}
return nil
}
func (db *DataBase) GetMethodType(uuid uint64, methodId uint32) *MethodDescriptor {
db.rw.RLock()
defer db.rw.RUnlock()
if value, ok := db.services[uuid]; ok {
if methodId < 1 || (methodId-1) >= uint32(len(value.Methods)) {
return nil
}
return value.Methods[methodId-1]
}
return nil
}
func (db *DataBase) AddWatcher(handle WatcherFunc) {
db.handle = handle
}
func (db *DataBase) ShutDown() {
if err := db.watcher.Close(); err != nil {
db.logger.Error("close file watcher with error %v ! ", err)
}
db.wg.Wait()
}
// tick 主循环,主要用于监听配置目录下文件的变化
func (db *DataBase) tick() {
defer db.wg.Done()
for {
select {
case event, ok := <-db.watcher.Events:
// 处理文件变动事件
if !ok {
// watcher has been closed
return
}
// 创建文件
if event.Op&fsnotify.Create == fsnotify.Create {
db.onFileAdd(event.Name)
}
// 文件修改
if event.Op&fsnotify.Write == fsnotify.Write {
db.onFileChanged(event.Name)
}
// 文件删除
if event.Op&fsnotify.Remove == fsnotify.Remove {
}
case err := <-db.watcher.Errors:
if err != nil {
db.logger.Error("unexpected error %q from %q watcher system ", db.cfg.DescriptionPath, err.Error())
}
return
}
}
}
func (db *DataBase) onFileAdd(name string) {
//判断是否 普通文件
if tools.IsRegularFile(name) {
// 增加 单个文件与服务
fd, err := db.loadServiceFromFile(name)
if err != nil {
db.logger.Warn("load new service config from %q error %q", name, err.Error())
return
}
if db.handle != nil {
_ = db.handle(ServiceAdded, fd.Services)
}
}
if tools.IsEffectiveDir(name) {
// 新增的是个路径
fds, err := db.loadServiceFromDir(name, true)
if err != nil {
db.logger.Warn("load new service config from %q error %q", name, err.Error())
return
}
// 合并所有的service
var services []*ServiceDescriptor
for _, fd := range fds {
services = append(services, fd.Services...)
}
// 添加内存缓存
db.rw.Lock()
for _, service := range services {
uuid, err := strconv.ParseInt(service.Uuid, 10, 64)
if err != nil {
db.logger.Warn("convert service's uuid %s to int failed ", service.Uuid)
}
db.services[uint64(uuid)] = service
}
db.rw.Unlock()
if db.handle != nil {
_ = db.handle(ServiceAdded, services)
}
}
}
func (db *DataBase) onFileChanged(name string) {
//判断是否 普通文件
if tools.IsRegularFile(name) {
// 增加 单个文件与服务
fd, err := db.loadServiceFromFile(name)
if err != nil {
db.logger.Warn("load new service config from %q error %q", name, err.Error())
return
}
// 添加内存缓存
db.rw.Lock()
for _, service := range fd.Services {
uuid, err := strconv.ParseInt(service.Uuid, 10, 64)
if err != nil {
db.logger.Warn("convert service's uuid %s to int failed ", service.Uuid)
}
db.services[uint64(uuid)] = service
}
db.rw.Unlock()
if db.handle != nil {
_ = db.handle(ServiceModify, fd.Services)
}
}
if tools.IsEffectiveDir(name) {
// 新增的是个路径
fds, err := db.loadServiceFromDir(name, true)
if err != nil {
db.logger.Warn("load new service config from %q error %q", name, err.Error())
return
}
// 合并所有的service
var services []*ServiceDescriptor
for _, fd := range fds {
services = append(services, fd.Services...)
}
// 添加内存缓存
db.rw.Lock()
for _, service := range services {
uuid, err := strconv.ParseInt(service.Uuid, 10, 64)
if err != nil {
db.logger.Warn("convert service's uuid %s to int failed ", service.Uuid)
}
db.services[uint64(uuid)] = service
}
db.rw.Unlock()
if db.handle != nil {
_ = db.handle(ServiceModify, services)
}
}
}
func (db *DataBase) loadServiceFromDir(dir string, ignoreErr bool) ([]*FileDescriptor, error) {
// 递归扫描目录下所有json文件
files, err := tools.ListFileInDirWithSuffix(dir, db.cfg.Suffix)
if err != nil {
db.logger.Error("scan config file error !")
return nil, err
}
var fds []*FileDescriptor
// 遍历文件,反射到file description
for _, file := range files {
// make file descriptor 跳过出错的文件并记录
if fd, err := db.loadServiceFromFile(file); err != nil {
if ignoreErr {
continue
} else {
return nil, err
}
} else {
fds = append(fds, fd)
}
}
return fds, nil
}
func (db *DataBase) loadServiceFromFile(file string) (*FileDescriptor, error) {
descriptor := &FileDescriptor{}
err := tools.JsonFileUnmarshalHelper(file, descriptor)
if err != nil {
db.logger.Warn("load service config file %q error %v", file, err)
return nil, err
}
basename := filepath.Base(file)
descriptor.BaseName = basename
return descriptor, nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/dennis-kk/service-box-go.git
git@gitee.com:dennis-kk/service-box-go.git
dennis-kk
service-box-go
Service-Box-go
v0.5.14

搜索帮助