1 Star 0 Fork 0

nggs / micro

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
super.go 9.92 KB
一键复制 编辑 原始数据 按行查看 历史
李文建 提交于 2020-04-17 15:39 . 1.持续完善actor,app
package actor
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/mailbox"
mdebug "gitee.com/nggs/micro/debug"
mlog "gitee.com/nggs/micro/log"
)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ISuper interface {
actor.Actor
PID() *actor.PID
ParentPID() *actor.PID
Sign() string
SetSign(sign string)
Logger() mlog.ILogger
SetLogger(logger mlog.ILogger)
Start(ctx actor.Context, name string) error
StartWithPrefix(ctx actor.Context, prefix string) error
WaitForStarted()
IsStop() bool
Stop() error
WaitForStopped()
StopAndWait() error
Poison() error
PoisonAndWait() error
NewTimer(dur time.Duration, cb TimerCallback) TimerID
NewTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID
NewLoopTimer(interval time.Duration, cb TimerCallback) TimerID
NewLoopTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID
StopTimer(id TimerID) error
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Warn(format string, args ...interface{})
Error(format string, args ...interface{})
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type VFOnStarted func(ctx actor.Context)
type VFOnStopping func(ctx actor.Context)
type VFOnStopped func(ctx actor.Context)
type VFOnReceiveMessage func(ctx actor.Context)
type VFOnActorTerminated func(who *actor.PID, ctx actor.Context)
type VFOnRestarting func(ctx actor.Context)
type VFOnRestarted func(ctx actor.Context)
type VFOnTimeout func(ctx actor.Context, id TimerID, tag TimerID)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type option struct {
vfOnStarted VFOnStarted
vfOnStopping VFOnStopping
vfOnStopped VFOnStopped
vfOnReceiveMessage VFOnReceiveMessage
vfOnActorTerminated VFOnActorTerminated
vfOnRestarting VFOnRestarting
vfOnRestarted VFOnRestarted
vfOnTimeout VFOnTimeout
logger mlog.ILogger
startedWg *sync.WaitGroup
stoppedWg *sync.WaitGroup
mailBoxSize int
decider actor.DeciderFunc
}
type OptionFunc func(option *option)
func newOption() option {
return option{
vfOnReceiveMessage: func(ctx actor.Context) {},
vfOnStarted: func(ctx actor.Context) {},
vfOnStopping: func(ctx actor.Context) {},
vfOnStopped: func(ctx actor.Context) {},
vfOnActorTerminated: func(who *actor.PID, ctx actor.Context) {},
vfOnRestarting: func(ctx actor.Context) {},
vfOnRestarted: func(ctx actor.Context) {},
vfOnTimeout: func(ctx actor.Context, id TimerID, tag TimerID) {},
decider: DefaultDecider,
}
}
func WithOnReceiveMessageHandler(vf VFOnReceiveMessage) OptionFunc {
return func(option *option) {
option.vfOnReceiveMessage = vf
}
}
func WithOnStartedHandler(vf VFOnStarted) OptionFunc {
return func(option *option) {
option.vfOnStarted = vf
}
}
func WithOnStoppingHandler(vf VFOnStopping) OptionFunc {
return func(option *option) {
option.vfOnStopping = vf
}
}
func WithOnStoppedHandler(vf VFOnStopped) OptionFunc {
return func(option *option) {
option.vfOnStopped = vf
}
}
func WithOnActorTerminateHandler(vf VFOnActorTerminated) OptionFunc {
return func(option *option) {
option.vfOnActorTerminated = vf
}
}
func WithOnRestartingHandler(vf VFOnRestarting) OptionFunc {
return func(option *option) {
option.vfOnRestarting = vf
}
}
func WithOnRestartedHandler(vf VFOnRestarted) OptionFunc {
return func(option *option) {
option.vfOnRestarted = vf
}
}
func WithOnTimeoutHandler(vf VFOnTimeout) OptionFunc {
return func(option *option) {
option.vfOnTimeout = vf
}
}
func WithLogger(logger mlog.ILogger) OptionFunc {
return func(option *option) {
option.logger = logger
}
}
func WithStartedWaitGroup(startedWg *sync.WaitGroup) OptionFunc {
return func(option *option) {
option.startedWg = startedWg
}
}
func WithStoppedWaitGroup(stoppedWg *sync.WaitGroup) OptionFunc {
return func(option *option) {
option.stoppedWg = stoppedWg
}
}
func WithMailBoxSize(mailBoxSize int) OptionFunc {
return func(option *option) {
option.mailBoxSize = mailBoxSize
}
}
func WithDecider(decider actor.DeciderFunc) OptionFunc {
return func(option *option) {
option.decider = decider
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type Super struct {
option
sign string
pid *actor.PID
parentPID *actor.PID
timerMgr *TimerManager
stopFlag int32
isRestart bool
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func NewSuper(options ...OptionFunc) ISuper {
s := &Super{
option: newOption(),
}
for _, optionFunc := range options {
optionFunc(&s.option)
}
if s.logger == nil {
s.logger = gLogger
}
if s.startedWg != nil {
s.startedWg.Add(1)
}
return s
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (s Super) PID() *actor.PID {
return s.pid.Clone()
}
func (s Super) ParentPID() *actor.PID {
return s.parentPID.Clone()
}
func (s Super) Sign() string {
return s.sign
}
func (s *Super) SetSign(sign string) {
s.sign = sign
}
func (s Super) Logger() mlog.ILogger {
return s.logger
}
func (s *Super) SetLogger(logger mlog.ILogger) {
if logger != nil {
s.logger = logger
}
}
func (s *Super) Start(ctx actor.Context, name string) (err error) {
props := actor.PropsFromProducer(func() actor.Actor { return s })
if s.mailBoxSize > 0 {
props = props.WithMailbox(mailbox.Bounded(s.mailBoxSize))
}
if s.decider != nil {
props = props.WithSupervisor(actor.NewOneForOneStrategy(0, 0, s.decider))
}
if ctx != nil {
pid, e := ctx.SpawnNamed(props, name)
if e != nil {
return e
}
s.pid = pid.Clone()
} else {
pid, e := RootContext.SpawnNamed(props, name)
if e != nil {
return e
}
s.pid = pid.Clone()
}
s.sign = s.pid.Id
return
}
func (s *Super) StartWithPrefix(ctx actor.Context, prefix string) (err error) {
props := actor.PropsFromProducer(func() actor.Actor { return s })
if s.mailBoxSize > 0 {
props = props.WithMailbox(mailbox.Bounded(s.mailBoxSize))
}
if s.decider != nil {
props = props.WithSupervisor(actor.NewOneForOneStrategy(0, 0, s.decider))
}
if ctx != nil {
s.pid = ctx.SpawnPrefix(props, prefix).Clone()
} else {
s.pid = RootContext.SpawnPrefix(props, prefix).Clone()
}
s.sign = s.pid.Id
return
}
func (s *Super) WaitForStarted() {
if s.startedWg != nil {
s.startedWg.Wait()
}
}
func (s Super) IsStop() bool {
return atomic.LoadInt32(&s.stopFlag) == 1
}
func (s *Super) Stop() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
RootContext.Stop(s.pid)
}
return nil
}
func (s *Super) WaitForStopped() {
if s.stoppedWg != nil {
s.stoppedWg.Wait()
}
}
func (s *Super) StopAndWait() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
return RootContext.StopFuture(s.pid).Wait()
}
return nil
}
func (s *Super) Poison() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
RootContext.Poison(s.pid)
}
return nil
}
func (s *Super) PoisonAndWait() error {
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
return ErrAlreadyStop
}
if s.pid != nil {
return RootContext.PoisonFuture(s.pid).Wait()
}
return nil
}
func (s *Super) NewTimer(dur time.Duration, cb TimerCallback) TimerID {
return s.timerMgr.NewTimer(dur, cb)
}
func (s *Super) NewTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID {
return s.timerMgr.NewTimerWithTag(dur, tag, cb)
}
func (s *Super) NewLoopTimer(interval time.Duration, cb TimerCallback) TimerID {
return s.timerMgr.NewLoopTimer(interval, cb)
}
func (s *Super) NewLoopTimerWithTag(dur time.Duration, tag TimerID, cb TimerCallback) TimerID {
return s.timerMgr.NewLoopTimerWithTag(dur, tag, cb)
}
func (s *Super) StopTimer(id TimerID) error {
return s.timerMgr.Stop(id)
}
func (s *Super) Debug(format string, args ...interface{}) {
s.logger.Debug("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
func (s *Super) Info(format string, args ...interface{}) {
s.logger.Info("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
func (s *Super) Warn(format string, args ...interface{}) {
s.logger.Warn("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
func (s *Super) Error(format string, args ...interface{}) {
s.logger.Error("[%s] %s", s.sign, fmt.Sprintf(format, args...))
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (s *Super) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *Timeout:
if err := s.timerMgr.Trigger(msg.ID); err != nil {
s.Error("trigger time fail, %s", err)
} else {
s.vfOnTimeout(ctx, msg.ID, msg.Tag)
}
case *actor.Started:
if !s.isRestart {
s.pid = ctx.Self().Clone()
s.parentPID = ctx.Parent().Clone()
s.timerMgr = NewTimerManager(s.pid)
s.vfOnStarted(ctx)
if s.startedWg != nil {
s.startedWg.Done()
}
if s.stoppedWg != nil {
s.stoppedWg.Add(1)
}
} else {
s.vfOnRestarted(ctx)
}
case *actor.Stopping:
atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1)
s.vfOnStopping(ctx)
case *actor.Stopped:
s.vfOnStopped(ctx)
s.timerMgr.StopAll()
if s.stoppedWg != nil {
s.stoppedWg.Done()
}
case *actor.Restarting:
s.Error("restarting")
s.vfOnRestarting(ctx)
s.isRestart = true
case *actor.Terminated:
s.vfOnActorTerminated(msg.Who, ctx)
default:
defer func() {
if reason := recover(); reason != nil {
s.Error("crashed, reason: %#v, stack: %s", reason, mdebug.Stack())
panic(reason)
}
}()
s.vfOnReceiveMessage(ctx)
}
}
Go
1
https://gitee.com/nggs/micro.git
git@gitee.com:nggs/micro.git
nggs
micro
micro
bac99dff65eb

搜索帮助