1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
loader.go 5.28 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-12-15 02:07 . 协程安全退出
package loader
import (
"encoding/json"
"encoding/xml"
"gitee.com/h79/goutils/common/bus"
"gitee.com/h79/goutils/common/coder"
cJSON "gitee.com/h79/goutils/common/json"
"gitee.com/h79/goutils/common/system"
cXML "gitee.com/h79/goutils/common/xml"
cYAML "gitee.com/h79/goutils/common/yaml"
"gopkg.in/yaml.v3"
"sync"
"sync/atomic"
"time"
)
type LoadFunc func() (interface{}, error)
type Loader struct {
stop chan bool
update chan bool
reading bool
watch int32
upChan chan bool
second time.Duration
format string //xml,json,yaml
topic string
cmd string
data interface{}
rm sync.RWMutex
loadFunc LoadFunc
unmarshal coder.Unmarshal //默认json
publisher bus.Publisher
watchRunning system.RunningCheck
}
const (
defCmd = "load"
defTopic = "load.data"
)
func CreateLoader() Loader {
return Loader{
cmd: defCmd,
topic: defTopic,
format: "json",
watch: 0,
update: make(chan bool),
stop: make(chan bool),
second: time.Second * 5,
reading: false,
data: nil,
loadFunc: nil,
publisher: nil,
unmarshal: cJSON.DefaultCoder,
}
}
func New() *Loader {
l := CreateLoader()
return &l
}
// Deprecated: this function simply calls [Data].
func (load *Loader) Get() interface{} {
return load.Data()
}
func (load *Loader) Data() interface{} {
load.rm.RLock()
defer load.rm.RUnlock()
return load.data
}
func (load *Loader) JSON() (string, error) {
load.rm.RLock()
defer load.rm.RUnlock()
buf, err := json.Marshal(load.data)
if err != nil {
return "", err
}
return string(buf), nil
}
func (load *Loader) XML() (string, error) {
load.rm.RLock()
defer load.rm.RUnlock()
buf, err := xml.Marshal(load.data)
if err != nil {
return "", err
}
return string(buf), nil
}
func (load *Loader) YAML() (string, error) {
load.rm.RLock()
defer load.rm.RUnlock()
buf, err := yaml.Marshal(load.data)
if err != nil {
return "", err
}
return string(buf), nil
}
func (load *Loader) WithFormat(format string) *Loader {
load.format = format
if format == JsonFormat {
load.unmarshal = cJSON.DefaultCoder
} else if format == XmlFormat {
load.unmarshal = cXML.DefaultCoder
} else if format == YamlFormat {
load.unmarshal = cYAML.DefaultCoder
} else {
load.unmarshal = coder.UnmarshalFunc(func(content []byte, v interface{}) error {
return ErrNotDefined
})
}
return load
}
func (load *Loader) WithCmd(cmd string) *Loader {
load.cmd = cmd
return load
}
func (load *Loader) WithTopic(topic string) *Loader {
load.topic = topic
return load
}
func (load *Loader) WithSecond(second time.Duration) *Loader {
load.second = second
return load
}
func (load *Loader) WithLoadFunc(l LoadFunc) *Loader {
load.loadFunc = l
return load
}
func (load *Loader) WithUnmarshal(l coder.Unmarshal) *Loader {
load.unmarshal = l
return load
}
func (load *Loader) WithPublisher(p bus.Publisher) *Loader {
load.publisher = p
return load
}
func (load *Loader) Update(async bool) {
if async {
if atomic.LoadInt32(&load.watch) > 0 {
load.watchRunning.GoRunning(load.watchLoad)
load.update <- true
} else {
system.ChildRunning(load.read)
}
} else {
load.read()
}
}
func (load *Loader) Reset() {
load.reading = false
}
func (load *Loader) Watch() {
if atomic.LoadInt32(&load.watch) == 0 {
atomic.StoreInt32(&load.watch, 1)
load.watchRunning.GoRunning(load.watchLoad)
}
}
func (load *Loader) WatchChan() <-chan bool {
if atomic.LoadInt32(&load.watch) == 0 {
atomic.StoreInt32(&load.watch, 2)
load.upChan = make(chan bool)
} else if atomic.LoadInt32(&load.watch) == 1 {
atomic.StoreInt32(&load.watch, 2)
load.upChan = make(chan bool)
}
load.watchRunning.GoRunning(load.watchLoad)
return load.upChan
}
func (load *Loader) Stop() {
if atomic.LoadInt32(&load.watch) == 2 {
close(load.upChan)
}
atomic.StoreInt32(&load.watch, 0)
close(load.stop)
}
func (load *Loader) Read() (error, bool) {
if load.loadFunc == nil {
return nil, false
}
if load.reading {
return nil, false
}
load.reading = true
data, err := load.loadFunc()
load.reading = false
if err != nil {
return err, false
}
load.rm.Lock()
load.data = data
load.rm.Unlock()
return nil, true
}
func (load *Loader) watchLoad() {
defer atomic.StoreInt32(&load.watch, 0)
ticker := time.NewTicker(load.second)
for {
select {
case _ = <-load.stop:
return
case _ = <-load.update:
if load.readV2() && atomic.LoadInt32(&load.watch) == 2 {
load.upChan <- true
}
case _ = <-ticker.C:
if load.readV2() && atomic.LoadInt32(&load.watch) == 2 {
load.upChan <- true
}
case <-system.Closed():
return
}
}
}
func (load *Loader) readV2() bool {
// 打开文件
// 为什么使用匿名函数? 当匿名函数退出时可用defer去关闭文件
// 如果不用匿名函数,在循环中不好关闭文件,一不小心就内存泄露
err, fUpdate := load.Read()
if err != nil {
return false
}
if !fUpdate {
return false
}
load.publish()
return true
}
func (load *Loader) read() {
load.readV2()
}
func (load *Loader) publish() {
if load.publisher == nil {
return
}
topic := load.topic
if len(topic) == 0 {
topic = defTopic
}
cmd := load.cmd
if len(cmd) == 0 {
cmd = defCmd
}
load.publisher.Publish(topic, bus.Event{Cmd: cmd, Data: load.data})
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.98

搜索帮助