1 Star 0 Fork 0

蒙蒙的男孩 / eosc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
process-controller.go 5.16 KB
一键复制 编辑 原始数据 按行查看 历史
蒙蒙的男孩 提交于 2024-01-10 13:49 . 重定义项目地址
package process
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"sync"
"sync/atomic"
"time"
"gitee.com/meng_mengs_boys/eosc/log"
"gitee.com/meng_mengs_boys/eosc/utils"
)
type IProcessUpdates []IProcessUpdate
func (I IProcessUpdates) Update(cmd *exec.Cmd) {
for _, i := range I {
i.Update(cmd)
}
}
type IProcessUpdate interface {
Update(cmd *exec.Cmd)
}
type IConfigBuild interface {
Config() StartArgs
}
type StartArgs struct {
Data []byte
ExtraFiles []*os.File
}
type ProcessController struct {
ctx context.Context
cancel context.CancelFunc
name string
current *ProcessCmd
callback IProcessUpdates
restartChan chan *StartArgs
locker sync.Mutex
isStop bool
isShutDown int32
logWriter io.Writer
}
func NewProcessController(ctx context.Context, name string, logWriter io.Writer, callback ...IProcessUpdate) *ProcessController {
newCtx, cancel := context.WithCancel(ctx)
c := &ProcessController{
callback: IProcessUpdates(callback),
name: name,
ctx: newCtx,
cancel: cancel,
restartChan: make(chan *StartArgs),
logWriter: logWriter,
}
atomic.StoreInt32(&c.isShutDown, 1)
go c.doControl()
return c
}
func (pc *ProcessController) Shutdown() {
pc.locker.Lock()
defer pc.locker.Unlock()
atomic.StoreInt32(&pc.isShutDown, 1)
if pc.current != nil {
pc.current.Close()
pc.current = nil
}
}
func (pc *ProcessController) Stop() {
pc.locker.Lock()
defer pc.locker.Unlock()
if pc.isStop {
return
}
if pc.cancel != nil {
pc.cancel()
pc.cancel = nil
}
pc.isStop = true
}
func newProcess(name string, data []byte, logWriter io.Writer, extraFiles []*os.File) (*ProcessCmd, error) {
cmd, err := Cmd(name, nil)
if err != nil {
return nil, err
}
reader, writer, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdin = bytes.NewReader(utils.EncodeFrame(data))
cmd.Stdout = writer
cmd.Stderr = logWriter
cmd.ExtraFiles = extraFiles
err = cmd.Start()
if err != nil {
return nil, err
}
pc := NewProcessCmd(name, cmd, reader)
go pc.Read()
return pc, nil
}
func (pc *ProcessController) getClient() *ProcessCmd {
return pc.current
}
func (pc *ProcessController) check(w *ProcessCmd, configData []byte, extraFiles []*os.File) {
err := w.Wait()
if err != nil {
log.Warn("worker exit:", err)
}
if atomic.LoadInt32(&pc.isShutDown) == 1 {
return
}
pc.locker.Lock()
defer pc.locker.Unlock()
if pc.current == w {
// 连接断开
err = pc.create(configData, extraFiles)
if err != nil {
log.Error("worker create:", err)
}
}
}
func (pc *ProcessController) run(configData []byte, extraFiles []*os.File) error {
log.DebugF("create %s process start...\n", pc.name)
//cfg := pc.configBuild.Config()
p, err := newProcess(pc.name, configData, pc.logWriter, extraFiles)
if err != nil {
log.Warnf("new %s process: %w", pc.name, err)
return err
}
old := pc.current
pc.current = p
go pc.check(pc.current, configData, extraFiles)
if old != nil {
old.Close()
}
return nil
}
func (pc *ProcessController) create(configData []byte, extraFiles []*os.File) error {
err := pc.run(configData, extraFiles)
if err != nil {
log.Warn("new process[", pc.name, "]:", err)
return err
}
ticker := time.NewTimer(time.Millisecond * 5)
defer utils.TimeSpend(fmt.Sprint("wait [", pc.name, "] process start:"))()
for {
<-ticker.C
log.Debug(pc.name, " controller ping...")
if pc.current == nil {
pc.callback.Update(nil)
return errors.New("process not exist")
}
switch pc.current.Status() {
case StatusRunning:
pc.callback.Update(pc.current.Cmd())
return nil
case StatusExit, StatusError:
pc.callback.Update(nil)
return errors.New("fail to start process " + pc.name + " " + strconv.Itoa(pc.current.Status()))
}
ticker.Reset(5 * time.Millisecond)
}
}
func (pc *ProcessController) Start(configData []byte, extraFiles []*os.File) error {
pc.locker.Lock()
defer pc.locker.Unlock()
atomic.StoreInt32(&pc.isShutDown, 0)
return pc.create(configData, extraFiles)
}
func (pc *ProcessController) TryRestart(configData []byte, extraFiles []*os.File) {
pc.restartChan <- &StartArgs{
Data: configData,
ExtraFiles: extraFiles,
}
}
func (pc *ProcessController) restart(configData []byte, extraFiles []*os.File) {
pc.locker.Lock()
defer pc.locker.Unlock()
err := pc.create(configData, extraFiles)
if err != nil {
log.Error("restart error: ", err)
}
return
}
func (pc *ProcessController) doControl() {
t := time.NewTimer(time.Second)
t.Stop()
defer t.Stop()
var lastConfig *StartArgs = new(StartArgs)
for {
select {
case <-pc.ctx.Done():
pc.Shutdown()
return
case arg, ok := <-pc.restartChan:
if ok {
lastConfig = arg
t.Reset(time.Second)
}
case <-t.C:
if atomic.LoadInt32(&pc.isShutDown) == 0 {
pc.restart(lastConfig.Data, lastConfig.ExtraFiles)
}
}
}
}
//func (pc *ProcessController) Sleep() error {
// client := pc.getClient()
// if client != nil {
// err := client.Sleep()
// if err != nil {
// pc.callback.Update(nil)
// return err
// }
// pc.callback.Update(client.Cmd())
// return nil
// }
// pc.callback.Update(nil)
// return errors.Start("process not exist")
//}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/meng_mengs_boys/eosc.git
git@gitee.com:meng_mengs_boys/eosc.git
meng_mengs_boys
eosc
eosc
v1.15.6

搜索帮助

344bd9b3 5694891 D2dac590 5694891