1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
daemon.go 5.25 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-08-28 14:28 . cmd
package system
import (
"errors"
"fmt"
"sync"
"time"
)
var (
DefaultDaemonConfig = DaemonConfig{Config: Config{EnableEnv: true, EnableStd: false}}
DefaultGroupRun *GroupRun = nil
CmdNilErr = errors.New("cmd obj is nil")
MaxCountErr = errors.New("max count")
MaxErr = errors.New("max error")
)
type DaemonConfig struct {
Config
MaxCount int `json:"maxCount" yaml:"maxCount" xml:"maxCount"`
MaxError int `json:"maxError" yaml:"maxError" xml:"maxError"`
MinExitTime int64 `json:"minExitTime" yaml:"minExitTime" xml:"minExitTime"`
}
type ArgOption struct {
Id int
AppExe string // 启动的程序
Args []string //参数
Env []string //环境变量
Data interface{} // 用户自己定义
}
type ArgOptionFunc func(arg *ArgOption)
type Status int
const (
KStatusRunning = Status(1)
KStatusClose = Status(2)
KStatusExit = Status(3)
)
type ProcessCallback func(cmd *Cmd, status Status, time int64) int
func callback(cb ProcessCallback, call func(cb ProcessCallback) int) int {
if cb != nil {
return call(cb)
}
return 0
}
type Daemon struct {
}
func (*Daemon) Background(id int, args []string, conf DaemonConfig, opts ...ArgOptionFunc) (*Cmd, error) {
var _ = &DefaultDaemonConfig
c, err := NewArgCmd(id, args, &conf.Config, opts...)
if err != nil {
return nil, err
}
if err = c.Start(); err != nil {
return nil, err
}
return c, nil
}
// Run 守护进程启动一个子进程, 并循环监视
func (d *Daemon) Run(groupId int, args []string, cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) int {
var (
count = 0
errNum = 0
exitCode = 0
cmd *Cmd
err error
)
d.initConfig(&config)
for {
//daemon 信息描述
if errNum > config.MaxError {
exitCode = 1
callback(cb, func(cb ProcessCallback) int {
return cb(cmd.WithErr(MaxErr), KStatusExit, time.Now().Unix())
})
break
}
if config.MaxCount > 0 && count > config.MaxCount {
exitCode = 2
callback(cb, func(cb ProcessCallback) int {
return cb(cmd.WithErr(MaxCountErr), KStatusExit, time.Now().Unix())
})
break
}
//启动时间戳
startTime := time.Now().Unix()
cmd, err = d.Background(groupId, args, config, opts...)
if err != nil { //启动失败
errNum++
continue
}
// 子进程,
if cmd == nil || cmd.Cmd == nil {
callback(cb, func(cb ProcessCallback) int {
return cb(cmd.WithErr(CmdNilErr), KStatusExit, time.Now().Unix())
})
break
}
callback(cb, func(cb ProcessCallback) int {
return cb(cmd, KStatusRunning, startTime)
})
count++
//父进程: 等待子进程退出
err = cmd.Cmd.Wait()
end := time.Now().Unix()
dat := end - startTime
if dat < config.MinExitTime {
//异常退出
errNum++
} else {
errNum = 0
}
if ret := callback(cb, func(cb ProcessCallback) int {
return cb(cmd, KStatusClose, end)
}); ret != 0 {
exitCode = ret
break
}
}
return exitCode
}
const KMaxGroupCount = 3000
// Group 守护进程启动一个N[1,2000]子进程, 并循环监视
func (*Daemon) Group(group int, args []string, cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
if DefaultGroupRun == nil {
DefaultGroupRun = NewGroup()
}
if group <= 0 {
group = 1
}
if group > KMaxGroupCount {
group = KMaxGroupCount
}
DefaultGroupRun.Exec(cb, config, opts...)
for i := 0; i < group; i++ {
_ = DefaultGroupRun.Add(i, NewChild(i, args, nil))
}
DefaultGroupRun.Wait()
}
func (*Daemon) initConfig(conf *DaemonConfig) {
if conf.MaxError <= 0 {
conf.MaxError = 10
}
if conf.MinExitTime <= 0 {
conf.MinExitTime = 20
}
if conf.MaxError <= 0 {
conf.MaxError = 5
}
}
type Child struct {
id int
args []string
Data interface{}
}
func NewChild(id int, args []string, data interface{}) *Child {
return &Child{
id: id,
args: append([]string{}, args...),
Data: data,
}
}
func (it *Child) GetId() int {
return it.id
}
type GroupRun struct {
wg sync.WaitGroup
cc map[int]*Child
rm sync.Mutex
itCh chan *Child
quitCh chan struct{}
running RunningCheck
}
func NewGroup() *GroupRun {
return &GroupRun{
cc: map[int]*Child{},
itCh: make(chan *Child),
quitCh: make(chan struct{}),
}
}
func (g *GroupRun) Add(id int, it *Child) error {
g.rm.Lock()
_, ok := g.cc[id]
if ok {
g.rm.Unlock()
return fmt.Errorf("%d is existed", id)
}
if len(g.cc) > KMaxGroupCount {
g.rm.Unlock()
return MaxCountErr
}
g.cc[id] = it
g.rm.Unlock()
g.wg.Add(1)
g.itCh <- it
return nil
}
func (g *GroupRun) Get(id int) (*Child, bool) {
g.rm.Lock()
defer g.rm.Unlock()
it, ok := g.cc[id]
return it, ok
}
func (g *GroupRun) Exec(cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
g.running.GoRunning(func() {
exit := Exit()
for {
select {
case <-exit.Done():
return
case it := <-g.itCh:
g.run(it, cb, config, opts...)
case <-g.quitCh:
g.quitCh <- struct{}{}
return
case <-Closed():
return
}
}
})
}
func (g *GroupRun) Wait() {
g.wg.Wait()
// wait execRun runtime quit
g.quitCh <- struct{}{}
<-g.quitCh
}
func (g *GroupRun) run(it *Child, cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
ChildRunning(func() {
defer func() {
g.wg.Done()
}()
var d = Daemon{}
d.Run(it.id, it.args, cb, config, opts...)
})
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.10.1

搜索帮助

344bd9b3 5694891 D2dac590 5694891