代码拉取完成,页面将自动刷新
package common_engine
import (
"context"
"gitee.com/coder_vector/common_engine/common/status"
"strings"
"sync"
)
type syncEngine struct {
ctx context.Context
lock sync.Mutex
queue chan CommandTask
filterFunc Filter
commandHandlers map[string]Command
taskNum int64
transfer Transfer
pool Pool
err error
}
func NewEngine(ctx context.Context) Engine {
return NewEngineWithOption(ctx)
}
func NewEngineWithOption(ctx context.Context, options ...EngineOption) Engine {
p, err := NewPool(100)
if err != nil {
panic(err)
}
eng := &syncEngine{
lock: sync.Mutex{},
queue: make(chan CommandTask, 10),
ctx: ctx,
commandHandlers: make(map[string]Command, 0),
taskNum: 0,
transfer: NewDefaultTransfer(),
pool: p,
}
err = eng.WithEngineOptions(options...)
if err != nil {
eng.SetErr(err)
}
return eng
}
func (e *syncEngine) BindCommand(command Command) Engine {
e.lock.Lock()
defer e.lock.Unlock()
if e.err != nil {
return e
}
cmd := strings.Trim(command.GetCommandText(), " ")
if len(cmd) == 0 {
e.SetErr(status.NewErr(status.CommandTextIsEmpty, nil))
return e
}
e.commandHandlers[cmd] = command
return e
}
func (e *syncEngine) Prepare() error {
return nil
}
func (e *syncEngine) Release() error {
// check task num
if e.taskNum > 0 {
return nil
}
e.pool.Close()
return nil
}
func (e *syncEngine) Running() error {
var err error
defer func() {
err = e.Release()
if err != nil {
e.SetErr(err)
}
}()
if err = e.Prepare(); err != nil {
return err
}
for {
select {
case ta, ok := <-e.queue:
if !ok {
err = status.NewErr(status.MustCommandNotFound, nil)
break
}
innerTa := ta.(*innerTask)
if innerTa.taskType != SyncTask {
err = status.NewErr(status.TaskTypeIllegal, nil)
break
}
err = e.pool.Submit(func() {
innerTa.call.Start(func() (interface{}, error) {
defer func() {
e.lock.Lock()
defer e.lock.Unlock()
e.taskNum--
}()
// parse task commands
var taskPart []*TaskPart
if taskPart, err = e.transfer.DoParse(e.ctx, innerTa); err != nil {
return nil, err
}
//do match
var taskCommand []*TaskCommand
if taskCommand, err = e.transfer.DoMatch(e.ctx, e.commandHandlers, taskPart); err != nil {
return nil, err
}
innerTa.commands = taskCommand
// TODO add running by command priority process
return e.processByCommand(e.ctx, innerTa)
})
})
if err != nil {
break
}
}
if err != nil {
break
}
}
if err != nil {
e.SetErr(err)
}
return e.Err()
}
func (e *syncEngine) processByCommand(ctx context.Context, task CommandTask) (interface{}, error) {
innerTa := task.(*innerTask)
commandResultMap := make(map[string]CommandResult)
var finalVal interface{}
for _, command := range innerTa.commands {
cmd, ok := e.commandHandlers[command.Text]
if ok {
commandResult := make(map[string]CommandResult, 0)
option := cmd.GetCommandOption()
if option != nil && len(option.ReferResult) > 0 {
for _, key := range option.ReferResult {
result, has := commandResultMap[key]
if has {
commandResult[key] = result
}
}
}
command.Val, command.Err = cmd.Handler(command, commandResult)
commandResultMap[cmd.GetCommandText()] = &cmdResult{
val: command.Val,
err: command.Err,
text: cmd.GetCommandText(),
}
if command.Err != nil {
return nil, command.Err
}
finalVal = command.Val
}
}
return finalVal, nil
}
func (e *syncEngine) Exec(task CommandTask) Response {
e.lock.Lock()
defer e.lock.Unlock()
if len(task.GetCommandsText()) == 0 {
return &defaultResp{
err: status.NewErr(status.CommandTextIsEmpty, nil),
call: nil,
}
}
resp := &defaultResp{}
// do filter
if e.filterFunc != nil {
if undo, err := e.filterFunc.Filter(e.ctx, task); err != nil {
resp.err = err
return resp
} else if undo {
resp.err = status.NewErr(status.UnSupportTask, nil)
return resp
}
}
call := NewSyncCall()
// pack task
t := &innerTask{
taskType: SyncTask,
task: task,
call: call,
status: 0,
val: nil,
err: nil,
commands: []*TaskCommand{},
}
// send to queue
e.queue <- t
e.taskNum++
resp.call = call
return resp
}
func (e *syncEngine) Err() error {
return e.err
}
func (e *syncEngine) getAllCommands() []Command {
res := make([]Command, 0, len(e.commandHandlers))
for _, v := range e.commandHandlers {
res = append(res, v)
}
return res
}
func (e *syncEngine) SetErr(err error) {
e.err = err
}
func (e *syncEngine) ResetErr() {
e.err = nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。