1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
job.go 3.89 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2024-05-29 15:53 . job trace id
package scheduler
import (
"fmt"
"gitee.com/h79/goutils/common/result"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/common/timer"
"time"
)
var ErrJobFuncNil = result.Error(result.ErrParam, "job func is nil")
var ErrJobQuited = result.Error(result.ErrParam, "job quited")
type Handler interface {
ProcessJob(job *Job) (interface{}, error)
}
type HandlerFunc func(job *Job) (interface{}, error)
func (fn HandlerFunc) ProcessJob(job *Job) (interface{}, error) {
return fn(job)
}
type ResultHandler interface {
ResultJob(res *Result)
}
type ResultFunc func(res *Result)
func (fn ResultFunc) ResultJob(res *Result) {
fn(res)
}
var _ Task = (*Job)(nil)
type Job struct {
Type string
Id string
TraceId string
State State
StartAt int64
Delay time.Duration //延后执行,卡住
Timeout time.Duration
handler Handler
resultHandler ResultHandler
payload interface{} //自定认数据
}
type Result struct {
TraceId string `json:"traceId"`
Type string `json:"type"`
Id string `json:"id"`
Err error `json:"err,omitempty"`
Data interface{} `json:"data,omitempty"` //处理结果
}
type shellData struct {
Cmd string
}
func NewShellJob(cmd string) *Job {
job := NewJob("shell", fmt.Sprintf("ShellJob:Cmd:%s", cmd), 0).
WithPayload(&shellData{Cmd: cmd})
job.WithHandlerFunc(func(j *Job) (interface{}, error) {
var payload = j.GetPayload()
shell, ok := payload.(*shellData)
if !ok {
return nil, fmt.Errorf("payload not shelldata")
}
var res = system.SyncExec(shell.Cmd, job.Timeout)
return res, nil
})
return job
}
func BuildJob(jobType, jobId string, timeout time.Duration) Job {
return Job{
Type: jobType,
Id: jobId,
State: InitState,
StartAt: timer.CurrentS(),
Timeout: timeout,
}
}
func NewJob(jobType, jobId string, timeout time.Duration) *Job {
job := BuildJob(jobType, jobId, timeout)
return &job
}
func NewEmptyJob(fn HandlerFunc) *Job {
job := BuildJob("", "", 0)
return job.SetHandler(fn)
}
func (job *Job) SetHandler(fn Handler) *Job {
job.handler = fn
return job
}
func (job *Job) WithHandlerFunc(fn func(job *Job) (interface{}, error)) *Job {
return job.SetHandler(HandlerFunc(fn))
}
func (job *Job) SetResultHandler(fn ResultHandler) *Job {
job.resultHandler = fn
return job
}
func (job *Job) WithResultFunc(fn func(res *Result)) *Job {
return job.SetResultHandler(ResultFunc(fn))
}
func (job *Job) WithTimeOut(timeout time.Duration) *Job {
job.Timeout = timeout
return job
}
func (job *Job) WithTraceId(traceId string) *Job {
job.TraceId = traceId
return job
}
func (job *Job) WithDelay(delay time.Duration) *Job {
job.Delay = delay
return job
}
func (job *Job) WithPayload(payload interface{}) *Job {
job.payload = payload
return job
}
func (job *Job) GetType() string {
return job.Type
}
func (job *Job) GetId() string {
return job.Id
}
func (job *Job) GetState() State {
return job.State
}
func (job *Job) GetPayload() interface{} {
return job.payload
}
func (job *Job) Execute() (interface{}, error) {
if job.State.IsQuit() {
return nil, ErrJobQuited
}
if job.State == InitState {
job.State = RunningState
if job.Delay > 0 {
// 卡住
system.Wait(job.Delay)
}
}
if job.handler == nil {
job.State = CompletedState
return nil, ErrJobFuncNil
}
return job.handler.ProcessJob(job)
}
func (job *Job) Cancel() {
job.State = CancelState
}
func (job *Job) Pause() {
job.State = PauseState
}
func (job *Job) Start() {
job.State = RunningState
}
func (job *Job) IsQuit() bool {
return job.State.IsQuit()
}
func (job *Job) CheckTimeout() bool {
if job.Timeout <= 0 {
return false
}
now := timer.CurrentS()
if job.StartAt <= 0 {
job.StartAt = now
}
return now-job.StartAt > int64(job.Timeout.Seconds())
}
func (job *Job) done(res *Result) {
if job.resultHandler != nil {
job.resultHandler.ResultJob(res)
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.68

搜索帮助

344bd9b3 5694891 D2dac590 5694891