1 Star 0 Fork 0

kzangv/gsf-fof

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
cron.go 3.22 KB
一键复制 编辑 原始数据 按行查看 历史
kzangv 提交于 2023-02-23 10:22 . fixed
package component
import (
"fmt"
"gitee.com/kzangv/gsf-fof/component/cron"
"gitee.com/kzangv/gsf-fof/component/define"
"gitee.com/kzangv/gsf-fof/gsf"
"gitee.com/kzangv/gsf-fof/logger"
"github.com/urfave/cli/v2"
"sync"
"time"
)
type Resize struct {
s *Cron
t int
}
func (r *Resize) Check(cap, len int) bool {
if cap-len > r.s.Cfg.ResizeThresholdLimit || (len > 0 && cap/len > r.s.Cfg.ResizeThresholdRatio) {
if r.t++; r.s.Cfg.ResizeThresholdTimes <= r.t {
r.t = 0
return true
}
} else {
r.t = 0
}
return false
}
func (r *Resize) NewCap(len int) int {
return len + 100
}
type Cron struct {
Cfg define.ComponentCron
lock sync.RWMutex
data map[string]*cron.Cron
stop chan struct{}
}
func (s *Cron) CliFlags() []cli.Flag {
return []cli.Flag{
&cli.IntFlag{Name: "cron-resize-enable", Usage: "cron resize enable", Value: 1, Destination: &s.Cfg.ResizeEnable},
&cli.IntFlag{Name: "cron-channel-buffer", Usage: "cron change event channel buffer size", Value: 100, Destination: &s.Cfg.ChannelBuffer},
&cli.IntFlag{Name: "cron-resize-duration", Usage: "cron resize duration", Value: 1800, Destination: &s.Cfg.ResizeDuration},
&cli.IntFlag{Name: "cron-resize-threshold-ratio", Usage: "cron resize threshold ratio(cap/len > ratio)", Value: 3, Destination: &s.Cfg.ResizeThresholdRatio},
&cli.IntFlag{Name: "cron-resize-threshold-limit", Usage: "cron resize threshold max size(cap-len > max size)", Value: 3000, Destination: &s.Cfg.ResizeThresholdLimit},
&cli.IntFlag{Name: "cron-resize-threshold-times", Usage: "cron resize threshold hold times", Value: 3, Destination: &s.Cfg.ResizeThresholdTimes},
}
}
func (s *Cron) Init(_ logger.Interface, _ *gsf.AppConfig, _ *cli.Context) error {
s.data = map[string]*cron.Cron{}
return nil
}
func (s *Cron) Run() error {
s.stop = make(chan struct{})
if s.Cfg.ResizeEnable != 0 {
ticker := time.NewTicker(time.Second * time.Duration(s.Cfg.ResizeDuration))
go func() {
defer func() { s.stop <- struct{}{} }()
for {
select {
case <-ticker.C:
s.lock.RLock()
for _, v := range s.data {
go v.Resize()
}
s.lock.RUnlock()
case <-s.stop:
return
}
}
}()
}
return nil
}
func (s *Cron) Close() error {
s.stop <- struct{}{}
<-s.stop
s.lock.RLock()
defer s.lock.RUnlock()
wg := sync.WaitGroup{}
wg.Add(len(s.data))
for k := range s.data {
go func(v *cron.Cron) {
defer wg.Done()
v.Stop()
}(s.data[k])
}
wg.Wait()
return nil
}
func (s *Cron) NewCronEx(name string, bSize int) (*cron.Cron, error) {
if s.data == nil {
return nil, fmt.Errorf("cron is not init")
}
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.data[name]; ok {
return nil, fmt.Errorf("cron %s is exist", name)
}
var r cron.Resize
if s.Cfg.ResizeEnable != 0 {
r = &Resize{s, 0}
}
s.data[name] = cron.NewCron(r, bSize)
return s.data[name], nil
}
func (s *Cron) NewCron(name string) (*cron.Cron, error) {
return s.NewCronEx(name, s.Cfg.ChannelBuffer)
}
func (s *Cron) FreeCron(name string) error {
if s.data == nil {
return fmt.Errorf("cron is not init")
}
s.lock.Lock()
defer s.lock.Unlock()
if v, ok := s.data[name]; !ok {
return fmt.Errorf("cron %s is not exist", name)
} else {
delete(s.data, name)
go v.Stop()
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/kzangv/gsf-fof.git
git@gitee.com:kzangv/gsf-fof.git
kzangv
gsf-fof
gsf-fof
v0.2.0

搜索帮助