代码拉取完成,页面将自动刷新
package actor
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/mailbox"
mdebug "gitee.com/nggs/micro/debug"
mlog "gitee.com/nggs/micro/log"
)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ISuper interface {
actor.Actor
PID() *actor.PID
ParentPID() *actor.PID
Sign() string
SetSign(sign string)
Logger() mlog.ILogger
SetLogger(logger mlog.ILogger)
Start(ctx actor.Context, name string) error
StartWithPrefix(ctx actor.Context, prefix string) error
WaitForStarted()
IsStop() bool
Stop() error
WaitForStopped()
StopAndWait() error
Poison() error
PoisonAndWait() error
NewTimer(dur time.Duration, cb TimerCallback) TimerID
NewTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID
NewLoopTimer(interval time.Duration, cb TimerCallback) TimerID
NewLoopTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID
StopTimer(id TimerID) error
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Warn(format string, args ...interface{})
Error(format string, args ...interface{})
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type VFOnStarted func(ctx actor.Context)
type VFOnStopping func(ctx actor.Context)
type VFOnStopped func(ctx actor.Context)
type VFOnReceiveMessage func(ctx actor.Context)
type VFOnActorTerminated func(who *actor.PID, ctx actor.Context)
type VFOnRestarting func(ctx actor.Context)
type VFOnRestarted func(ctx actor.Context)
type VFOnTimeout func(ctx actor.Context, id TimerID, tag TimerID)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type option struct {
vfOnStarted VFOnStarted
vfOnStopping VFOnStopping
vfOnStopped VFOnStopped
vfOnReceiveMessage VFOnReceiveMessage
vfOnActorTerminated VFOnActorTerminated
vfOnRestarting VFOnRestarting
vfOnRestarted VFOnRestarted
vfOnTimeout VFOnTimeout
logger mlog.ILogger
startedWg *sync.WaitGroup
stoppedWg *sync.WaitGroup
mailBoxSize int
decider actor.DeciderFunc
}
type OptionFunc func(option *option)
func newOption() option {
return option{
vfOnReceiveMessage: func(ctx actor.Context) {},
vfOnStarted: func(ctx actor.Context) {},
vfOnStopping: func(ctx actor.Context) {},
vfOnStopped: func(ctx actor.Context) {},
vfOnActorTerminated: func(who *actor.PID, ctx actor.Context) {},
vfOnRestarting: func(ctx actor.Context) {},
vfOnRestarted: func(ctx actor.Context) {},
vfOnTimeout: func(ctx actor.Context, id TimerID, tag TimerID) {},
decider: DefaultDecider,
}
}
func WithOnReceiveMessageHandler(vf VFOnReceiveMessage) OptionFunc {
return func(option *option) {
option.vfOnReceiveMessage = vf
}
}
func WithOnStartedHandler(vf VFOnStarted) OptionFunc {
return func(option *option) {
option.vfOnStarted = vf
}
}
func WithOnStoppingHandler(vf VFOnStopping) OptionFunc {
return func(option *option) {
option.vfOnStopping = vf
}
}
func WithOnStoppedHandler(vf VFOnStopped) OptionFunc {
return func(option *option) {
option.vfOnStopped = vf
}
}
func WithOnActorTerminateHandler(vf VFOnActorTerminated) OptionFunc {
return func(option *option) {
option.vfOnActorTerminated = vf
}
}
func WithOnRestartingHandler(vf VFOnRestarting) OptionFunc {
return func(option *option) {
option.vfOnRestarting = vf
}
}
func WithOnRestartedHandler(vf VFOnRestarted) OptionFunc {
return func(option *option) {
option.vfOnRestarted = vf
}
}
func WithOnTimeoutHandler(vf VFOnTimeout) OptionFunc {
return func(option *option) {
option.vfOnTimeout = vf
}
}
func WithLogger(logger mlog.ILogger) OptionFunc {
return func(option *option) {
option.logger = logger
}
}
func WithStartedWaitGroup(startedWg *sync.WaitGroup) OptionFunc {
return func(option *option) {
option.startedWg = startedWg
}
}
func WithStoppedWaitGroup(stoppedWg *sync.WaitGroup) OptionFunc {
return func(option *option) {
option.stoppedWg = stoppedWg
}
}
func WithMailBoxSize(mailBoxSize int) OptionFunc {
return func(option *option) {
option.mailBoxSize = mailBoxSize
}
}
func WithDecider(decider actor.DeciderFunc) OptionFunc {
return func(option *option) {
option.decider = decider
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type Super struct {
option
sign string
pid *actor.PID
parentPID *actor.PID
timerMgr *TimerManager
stopFlag int32
isRestart bool
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func NewSuper(options ...OptionFunc) ISuper {
s := &Super{
option: newOption(),
}
for _, optionFunc := range options {
optionFunc(&s.option)
}
if s.logger == nil {
s.logger = gLogger
}
if s.startedWg != nil {
s.startedWg.Add(1)
}
return s
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (s Super) PID() *actor.PID {
return s.pid.Clone()
}
func (s Super) ParentPID() *actor.PID {
return s.parentPID.Clone()
}
func (s Super) Sign() string {
return s.sign
}
func (s *Super) SetSign(sign string) {
s.sign = sign
}
func (s Super) Logger() mlog.ILogger {
return s.logger
}
func (s *Super) SetLogger(logger mlog.ILogger) {
if logger != nil {
s.logger = logger
}
}
func (s *Super) Start(ctx actor.Context, name string) (err error) {
props := actor.PropsFromProducer(func() actor.Actor { return s })
if s.mailBoxSize > 0 {
props = props.WithMailbox(mailbox.Bounded(s.mailBoxSize))
}
if s.decider != nil {
props = props.WithSupervisor(actor.NewOneForOneStrategy(0, 0, s.decider))
}
if ctx != nil {
pid, e := ctx.SpawnNamed(props, name)
if e != nil {
return e
}
s.pid = pid.Clone()
} else {
pid, e := RootContext.SpawnNamed(props, name)
if e != nil {
return e
}
s.pid = pid.Clone()
}
s.sign = s.pid.Id
return
}
func (s *Super) StartWithPrefix(ctx actor.Context, prefix string) (err error) {
props := actor.PropsFromProducer(func() actor.Actor { return s })
if s.mailBoxSize > 0 {
props = props.WithMailbox(mailbox.Bounded(s.mailBoxSize))
}
if s.decider != nil {
props = props.WithSupervisor(actor.NewOneForOneStrategy(0, 0, s.decider))
}
if ctx != nil {
s.pid = ctx.SpawnPrefix(props, prefix).Clone()
} else {
s.pid = RootContext.SpawnPrefix(props, prefix).Clone()
}
s.sign = s.pid.Id
return
}
func (s *Super) WaitForStarted() {
if s.startedWg != nil {
s.startedWg.Wait()
}
}
func (s Super) IsStop() bool {
return atomic.LoadInt32(&s.stopFlag) == 1
}
func (s *Super) Stop() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
RootContext.Stop(s.pid)
}
return nil
}
func (s *Super) WaitForStopped() {
if s.stoppedWg != nil {
s.stoppedWg.Wait()
}
}
func (s *Super) StopAndWait() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
return RootContext.StopFuture(s.pid).Wait()
}
return nil
}
func (s *Super) Poison() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
RootContext.Poison(s.pid)
}
return nil
}
func (s *Super) PoisonAndWait() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
return RootContext.PoisonFuture(s.pid).Wait()
}
return nil
}
func (s *Super) NewTimer(dur time.Duration, cb TimerCallback) TimerID {
return s.timerMgr.NewTimer(dur, cb)
}
func (s *Super) NewTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID {
return s.timerMgr.NewTimerWithTag(dur, tag, cb)
}
func (s *Super) NewLoopTimer(interval time.Duration, cb TimerCallback) TimerID {
return s.timerMgr.NewLoopTimer(interval, cb)
}
func (s *Super) NewLoopTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID {
return s.timerMgr.NewLoopTimerWithTag(dur, tag, cb)
}
func (s *Super) StopTimer(id TimerID) error {
return s.timerMgr.Stop(id)
}
func (s *Super) Debug(format string, args ...interface{}) {
s.logger.Debug("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
func (s *Super) Info(format string, args ...interface{}) {
s.logger.Info("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
func (s *Super) Warn(format string, args ...interface{}) {
s.logger.Warn("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
func (s *Super) Error(format string, args ...interface{}) {
s.logger.Error("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (s *Super) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *Timeout:
if err := s.timerMgr.Trigger(msg.ID); err != nil {
s.Error("trigger time fail, %s", err)
} else {
s.vfOnTimeout(ctx, msg.ID, msg.Tag)
}
case *actor.Started:
if !s.isRestart {
s.pid = ctx.Self().Clone()
s.parentPID = ctx.Parent().Clone()
s.timerMgr = NewTimerManager(s.pid)
s.vfOnStarted(ctx)
if s.startedWg != nil {
s.startedWg.Done()
}
if s.stoppedWg != nil {
s.stoppedWg.Add(1)
}
} else {
s.vfOnRestarted(ctx)
}
case *actor.Stopping:
atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1)
s.vfOnStopping(ctx)
case *actor.Stopped:
s.vfOnStopped(ctx)
s.timerMgr.StopAll()
if s.stoppedWg != nil {
s.stoppedWg.Done()
}
case *actor.Restarting:
s.Error("restarting")
s.vfOnRestarting(ctx)
s.isRestart = true
case *actor.Terminated:
s.vfOnActorTerminated(msg.Who, ctx)
default:
defer func() {
if reason := recover(); reason != nil {
s.Error("crashed, reason: %#v, stack: %s", reason, mdebug.Stack())
panic(reason)
}
}()
s.vfOnReceiveMessage(ctx)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。