1 Star 0 Fork 0

simplexyz / simplego

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
woker.go 5.60 KB
一键复制 编辑 原始数据 按行查看 历史
李文建 提交于 2023-10-30 15:28 . 完善actor
package worker
import (
"fmt"
"log"
"sync/atomic"
"time"
aactor "github.com/asynkron/protoactor-go/actor"
sactor "gitee.com/simplexyz/simplego/actor"
sadefine "gitee.com/simplexyz/simplego/actor/define"
satimer "gitee.com/simplexyz/simplego/actor/timer"
sawork "gitee.com/simplexyz/simplego/actor/work"
sdefine "gitee.com/simplexyz/simplego/define"
)
type Worker struct {
option
sign string
pid *sadefine.PID
parentPID *sadefine.PID
hasStart atomic.Bool
started atomic.Bool
hasStop atomic.Bool
isRestart bool
timerMgr *satimer.Manager
}
func Create(impl sdefine.IWorkerImplement, optionFuncs ...OptionFunc) sdefine.IWorker {
w := &Worker{
option: newOption(impl),
}
for _, optionFunc := range optionFuncs {
optionFunc(&w.option)
}
if w.startedWg != nil {
w.startedWg.Add(1)
}
return w
}
func (w *Worker) PID() *sadefine.PID {
return w.pid.Clone()
}
func (w *Worker) ParentPID() *sadefine.PID {
return w.parentPID.Clone()
}
func (w *Worker) Sign() string {
return w.sign
}
func (w *Worker) SetSign(sign string) {
_ = w.Post(sawork.PostFunc(func(ctx sadefine.Context) {
w.sign = sign
}))
}
func (w *Worker) Start(ctx sadefine.Context, name string, startFunc func() error, wait bool) (err error) {
if !w.hasStart.CompareAndSwap(false, true) {
return sactor.ErrAlreadyStart
}
if startFunc != nil {
if err = startFunc(); err != nil {
return
}
}
var propsOptions []sadefine.PropsOption
if w.mailBoxSize > 0 {
propsOptions = append(propsOptions, aactor.WithMailbox(aactor.Bounded(w.mailBoxSize)))
} else {
propsOptions = append(propsOptions, aactor.WithMailbox(aactor.Unbounded()))
}
if w.decider != nil {
propsOptions = append(propsOptions, aactor.WithSupervisor(aactor.NewOneForOneStrategy(0, 0, w.decider)))
}
props := aactor.PropsFromProducer(func() aactor.Actor { return w }, propsOptions...)
if ctx != nil {
pid, e := ctx.SpawnNamed(props, name)
if e != nil {
err = fmt.Errorf("ctx.SpawnNamed fail, %w", e)
return
}
w.sign = pid.Id
} else {
pid, e := sactor.RootContext().SpawnNamed(props, name)
if e != nil {
err = fmt.Errorf("RootContext().SpawnNamed fail, %w", e)
return
}
w.sign = pid.Id
}
if wait && w.startedWg != nil {
w.startedWg.Wait()
}
return
}
func (w *Worker) HasStart() bool {
return w.hasStart.Load()
}
func (w *Worker) Started() bool {
return w.started.Load()
}
func (w *Worker) Stop(stopFunc func(), wait bool) error {
if !w.hasStop.CompareAndSwap(false, true) {
return sactor.ErrAlreadyStop
}
if stopFunc != nil {
stopFunc()
}
if w.pid != nil {
sactor.RootContext().Stop(w.pid)
}
return nil
}
func (w *Worker) HasStop() bool {
return w.hasStop.Load()
}
func (w *Worker) Post(work sawork.IPostWork) error {
if work == nil {
return sawork.ErrDoNotPostOrDispatchNilWork
}
if !w.Started() {
return sactor.ErrNotStart
}
p := sawork.CreateMessagePostWork(work)
sactor.RootContext().Send(w.pid, p)
return nil
}
func (w *Worker) Dispatch(timeout time.Duration, work sawork.IDispatchWork) error {
if work == nil {
return sawork.ErrDoNotPostOrDispatchNilWork
}
if !w.Started() {
return sactor.ErrNotStart
}
if timeout <= 0 {
timeout = sawork.DefaultDispatchTimeout
}
req := sawork.CreateMessageDispatchWork(work)
f := sactor.RootContext().RequestFuture(w.pid, req, timeout)
if e := f.Wait(); e != nil {
return fmt.Errorf("wait dispatch result fail, %w", e)
}
r, err := f.Result()
if err != nil {
return fmt.Errorf("get dispatch result fail, %w", err)
}
resp, ok := r.(*sawork.MessageDispatchWork)
if !ok || resp == nil {
return sawork.ErrDispatchResultNotMatch
}
err = resp.Err
sawork.DestroyMessageDispatchWork(resp)
return err
}
func (w *Worker) NewTimer(dur time.Duration, tag satimer.Tag, cb satimer.Callback) satimer.ID {
return w.timerMgr.NewTimer(dur, tag, cb)
}
func (w *Worker) NewLoopTimer(dur time.Duration, tag satimer.ID, cb satimer.Callback) satimer.ID {
return w.timerMgr.NewLoopTimer(dur, tag, cb)
}
func (w *Worker) StopTimer(id satimer.ID) error {
return w.timerMgr.Stop(id)
}
func (w *Worker) Receive(ctx sadefine.Context) {
switch msg := ctx.Message().(type) {
case *sawork.MessagePostWork:
msg.Work.Execute(ctx)
sawork.DestroyMessagePostWork(msg)
case *sawork.MessageDispatchWork:
msg.Err = msg.Work.Execute(ctx)
ctx.Respond(msg)
case *satimer.MessageTimeout:
ok, err := w.impl.BeforeTriggerTimer(msg.ID, msg.Tag, ctx)
if !ok {
return
}
if err != nil {
log.Printf("[%s] execute BeforeTriggerTimer fail, id=%d, tag=%d, %s", w.sign, msg.ID, msg.Tag, err)
return
}
err = w.timerMgr.Trigger(ctx, msg.ID)
if err != nil {
log.Printf("[%s] trigger timer fail, id=%d, tag=%d, %s", w.sign, msg.ID, msg.Tag, err)
}
w.impl.AfterTriggerTimer(err, msg.ID, msg.Tag, ctx)
case *sadefine.Started:
if !w.isRestart {
w.pid = ctx.Self().Clone()
w.parentPID = ctx.Parent().Clone()
w.timerMgr = satimer.NewManager(sactor.RootContext(), w.pid.Clone())
w.impl.OnStarted(ctx)
w.started.Store(true)
if w.stoppedWg != nil {
w.stoppedWg.Add(1)
}
if w.startedWg != nil {
w.startedWg.Done()
}
} else {
w.impl.OnRestarted(ctx)
}
case *sadefine.Stopping:
w.impl.OnStopping(ctx)
w.hasStop.Store(true)
case *sadefine.Stopped:
w.impl.OnStopped(ctx)
w.impl.Destroy()
w.impl = nil
w.timerMgr.StopAll()
w.timerMgr = nil
if w.stoppedWg != nil {
w.stoppedWg.Done()
}
case *sadefine.Restarting:
log.Printf("[%s] restarting", w.sign)
w.impl.OnRestarting(ctx)
w.isRestart = true
case *sadefine.Terminated:
w.impl.OnActorTerminated(msg.Who, ctx)
default:
w.impl.OnReceiveMessage(ctx)
}
}
Go
1
https://gitee.com/simplexyz/simplego.git
git@gitee.com:simplexyz/simplego.git
simplexyz
simplego
simplego
d62e3dcece80

搜索帮助