1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
daemon.go 5.45 KB
一键复制 编辑 原始数据 按行查看 历史
package system
import (
"errors"
"fmt"
"os/exec"
"path/filepath"
"sync"
"time"
)
var (
DefaultDaemonConfig = DaemonConfig{Config: Config{EnableEnv: true, EnableStd: false}}
DefaultGroupRun *GroupRun = nil
CmdNilErr = errors.New("cmd obj is nil")
MaxCountErr = errors.New("max count")
MaxErr = errors.New("max error")
)
type DaemonConfig struct {
Config
MaxCount int `json:"maxCount" yaml:"maxCount" xml:"maxCount"`
MaxError int `json:"maxError" yaml:"maxError" xml:"maxError"`
MinExitTime int64 `json:"minExitTime" yaml:"minExitTime" xml:"minExitTime"`
}
type ArgOption struct {
Id int
AppExe string // 启动的程序
Args []string //参数
Env []string //环境变量
Data interface{} // 用户自己定义
}
type ArgOptionFunc func(arg *ArgOption)
type Status int
const (
KStatusRunning = Status(1)
KStatusClose = Status(2)
KStatusExit = Status(3)
)
type ProcessCallback func(cmd *Cmd, status Status, time int64)
func callback(cb ProcessCallback, call func(cb ProcessCallback)) {
if cb != nil {
call(cb)
}
}
type Daemon struct {
}
func (*Daemon) Background(id int, args []string, conf DaemonConfig, opts ...ArgOptionFunc) (*Cmd, error) {
var _ = &DefaultDaemonConfig
var opt = ArgOption{Id: id, Args: args}
for i := range opts {
opts[i](&opt)
}
if len(opt.AppExe) == 0 {
opt.AppExe = opt.Args[0]
}
appExe, _ := filepath.Abs(opt.AppExe)
cmd := exec.Command(appExe, opt.Args[1:]...)
c := NewCmd(cmd, &conf.Config, opt.Env...).WithData(opt.Data)
cmd, err := c.Start()
return c, err
}
// Run 守护进程启动一个子进程, 并循环监视
func (d *Daemon) Run(groupId int, args []string, cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) int {
var (
count = 0
errNum = 0
exitCode = 0
cmd *Cmd
err error
)
d.initConfig(&config)
for {
//daemon 信息描述
if errNum > config.MaxError {
exitCode = 1
callback(cb, func(cb ProcessCallback) {
cb(cmd.WithErr(MaxErr), KStatusExit, time.Now().Unix())
})
break
}
if config.MaxCount > 0 && count > config.MaxCount {
exitCode = 2
callback(cb, func(cb ProcessCallback) {
cb(cmd.WithErr(MaxCountErr), KStatusExit, time.Now().Unix())
})
break
}
//启动时间戳
startTime := time.Now().Unix()
cmd, err = d.Background(groupId, args, config, opts...)
if err != nil { //启动失败
errNum++
continue
}
// 子进程,
if cmd == nil || cmd.Cmd == nil {
callback(cb, func(cb ProcessCallback) {
cb(cmd.WithErr(CmdNilErr), KStatusExit, time.Now().Unix())
})
break
}
callback(cb, func(cb ProcessCallback) {
cb(cmd, KStatusRunning, startTime)
})
count++
//父进程: 等待子进程退出
err = cmd.Cmd.Wait()
end := time.Now().Unix()
dat := end - startTime
if dat < config.MinExitTime {
//异常退出
errNum++
} else {
errNum = 0
}
callback(cb, func(cb ProcessCallback) {
cb(cmd, KStatusClose, end)
})
}
return exitCode
}
const KMaxGroupCount = 3000
// Group 守护进程启动一个N[1,2000]子进程, 并循环监视
func (*Daemon) Group(group int, args []string, cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
if DefaultGroupRun == nil {
DefaultGroupRun = NewGroup()
}
if group <= 0 {
group = 1
}
if group > KMaxGroupCount {
group = KMaxGroupCount
}
DefaultGroupRun.Exec(cb, config, opts...)
for i := 0; i < group; i++ {
_ = DefaultGroupRun.Add(i, NewChild(i, args, nil))
}
DefaultGroupRun.Wait()
}
func (*Daemon) initConfig(conf *DaemonConfig) {
if conf.MaxError <= 0 {
conf.MaxError = 10
}
if conf.MinExitTime <= 0 {
conf.MinExitTime = 20
}
if conf.MaxError <= 0 {
conf.MaxError = 5
}
}
type Child struct {
id int
args []string
Data interface{}
}
func NewChild(id int, args []string, data interface{}) *Child {
return &Child{
id: id,
args: append([]string{}, args...),
Data: data,
}
}
func (it *Child) GetId() int {
return it.id
}
type GroupRun struct {
exec bool
wg sync.WaitGroup
cc map[int]*Child
rm sync.Mutex
itCh chan *Child
exCh chan struct{}
quitCh chan struct{}
}
func NewGroup() *GroupRun {
return &GroupRun{
exec: false,
cc: map[int]*Child{},
itCh: make(chan *Child),
exCh: make(chan struct{}),
quitCh: make(chan struct{}),
}
}
func (g *GroupRun) Add(id int, it *Child) error {
g.rm.Lock()
_, ok := g.cc[id]
if ok {
g.rm.Unlock()
return fmt.Errorf("%d is existed", id)
}
if len(g.cc) > KMaxGroupCount {
g.rm.Unlock()
return MaxCountErr
}
g.cc[id] = it
g.rm.Unlock()
g.wg.Add(1)
g.itCh <- it
return nil
}
func (g *GroupRun) Get(id int) (*Child, bool) {
g.rm.Lock()
defer g.rm.Unlock()
it, ok := g.cc[id]
return it, ok
}
func (g *GroupRun) Exec(cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
if g.exec {
return
}
g.exec = true
ChildRunning(func() {
g.execRun(cb, config, opts...)
})
}
func (g *GroupRun) execRun(cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
for {
select {
case it := <-g.itCh:
g.run(it, cb, config, opts...)
case <-g.exCh:
g.wg.Done()
case <-g.quitCh:
g.quitCh <- struct{}{}
return
}
}
}
func (g *GroupRun) Wait() {
g.wg.Wait()
// wait execRun runtime quit
g.quitCh <- struct{}{}
<-g.quitCh
}
func (g *GroupRun) run(it *Child, cb ProcessCallback, config DaemonConfig, opts ...ArgOptionFunc) {
ChildRunning(func() {
var d = Daemon{}
d.Run(it.id, it.args, cb, config, opts...)
g.exCh <- struct{}{}
})
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.8.25

搜索帮助

344bd9b3 5694891 D2dac590 5694891