1 Star 0 Fork 0

jack/protoactor-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
actor_context.go 18.61 KB
一键复制 编辑 原始数据 按行查看 历史
490689386@qq.com 提交于 2025-05-19 14:50 +08:00 . 初始化
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
package actor
import (
"context"
"errors"
"fmt"
"log/slog"
"runtime/debug"
"sync/atomic"
"time"
"gitee.com/wujianhai/protoactor-go/ctxext"
"gitee.com/wujianhai/protoactor-go/metrics"
"github.com/emirpasic/gods/stacks/linkedliststack"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
const (
stateAlive int32 = iota
stateRestarting
stateStopping
stateStopped
)
type actorContextExtras struct {
children PIDSet
receiveTimeoutTimer *time.Timer
rs *RestartStatistics
stash *linkedliststack.Stack
watchers PIDSet
context Context
extensions *ctxext.ContextExtensions
}
func newActorContextExtras(context Context) *actorContextExtras {
this := &actorContextExtras{
context: context,
extensions: ctxext.NewContextExtensions(),
}
return this
}
func (ctxExt *actorContextExtras) restartStats() *RestartStatistics {
// lazy initialize the child restart stats if this is the first time
// further mutations are handled within "restart"
if ctxExt.rs == nil {
ctxExt.rs = NewRestartStatistics()
}
return ctxExt.rs
}
func (ctxExt *actorContextExtras) initReceiveTimeoutTimer(timer *time.Timer) {
ctxExt.receiveTimeoutTimer = timer
}
func (ctxExt *actorContextExtras) resetReceiveTimeoutTimer(time time.Duration) {
if ctxExt.receiveTimeoutTimer == nil {
return
}
ctxExt.receiveTimeoutTimer.Reset(time)
}
func (ctxExt *actorContextExtras) stopReceiveTimeoutTimer() {
if ctxExt.receiveTimeoutTimer == nil {
return
}
ctxExt.receiveTimeoutTimer.Stop()
}
func (ctxExt *actorContextExtras) killReceiveTimeoutTimer() {
if ctxExt.receiveTimeoutTimer == nil {
return
}
ctxExt.receiveTimeoutTimer.Stop()
ctxExt.receiveTimeoutTimer = nil
}
func (ctxExt *actorContextExtras) addChild(pid *PID) {
ctxExt.children.Add(pid)
}
func (ctxExt *actorContextExtras) removeChild(pid *PID) {
ctxExt.children.Remove(pid)
}
func (ctxExt *actorContextExtras) watch(watcher *PID) {
ctxExt.watchers.Add(watcher)
}
func (ctxExt *actorContextExtras) unwatch(watcher *PID) {
ctxExt.watchers.Remove(watcher)
}
type actorContext struct {
actor Actor
actorSystem *ActorSystem
extras *actorContextExtras
props *Props
parent *PID
self *PID
receiveTimeout time.Duration
messageOrEnvelope interface{}
state int32
}
var (
_ SenderContext = &actorContext{}
_ ReceiverContext = &actorContext{}
_ SpawnerContext = &actorContext{}
_ basePart = &actorContext{}
_ stopperPart = &actorContext{}
)
func newActorContext(actorSystem *ActorSystem, props *Props, parent *PID) *actorContext {
this := &actorContext{
parent: parent,
props: props,
actorSystem: actorSystem,
}
this.incarnateActor()
return this
}
func (ctx *actorContext) ensureExtras() *actorContextExtras {
if ctx.extras == nil {
ctxd := Context(ctx)
if ctx.props != nil && ctx.props.contextDecoratorChain != nil {
ctxd = ctx.props.contextDecoratorChain(ctxd)
}
ctx.extras = newActorContextExtras(ctxd)
}
return ctx.extras
}
//
// Interface: Context
//
func (ctx *actorContext) ActorSystem() *ActorSystem {
return ctx.actorSystem
}
func (ctx *actorContext) Logger() *slog.Logger {
return ctx.actorSystem.Logger()
}
func (ctx *actorContext) Parent() *PID {
return ctx.parent
}
func (ctx *actorContext) Self() *PID {
return ctx.self
}
func (ctx *actorContext) Sender() *PID {
return UnwrapEnvelopeSender(ctx.messageOrEnvelope)
}
func (ctx *actorContext) Actor() Actor {
return ctx.actor
}
func (ctx *actorContext) ReceiveTimeout() time.Duration {
return ctx.receiveTimeout
}
func (ctx *actorContext) Children() []*PID {
if ctx.extras == nil {
return make([]*PID, 0)
}
return ctx.extras.children.Values()
}
func (ctx *actorContext) Respond(response interface{}) {
// If the message is addressed to nil forward it to the dead letter channel
if ctx.Sender() == nil {
ctx.actorSystem.DeadLetter.SendUserMessage(nil, response)
return
}
ctx.Send(ctx.Sender(), response)
}
func (ctx *actorContext) Stash() {
extra := ctx.ensureExtras()
if extra.stash == nil {
extra.stash = linkedliststack.New()
}
extra.stash.Push(ctx.Message())
}
func (ctx *actorContext) Watch(who *PID) {
who.sendSystemMessage(ctx.actorSystem, &Watch{
Watcher: ctx.self,
})
}
func (ctx *actorContext) Unwatch(who *PID) {
who.sendSystemMessage(ctx.actorSystem, &Unwatch{
Watcher: ctx.self,
})
}
func (ctx *actorContext) SetReceiveTimeout(d time.Duration) {
if d < time.Millisecond {
// anything less than 1 millisecond is set to zero
d = 0
}
if d == ctx.receiveTimeout {
return
}
ctx.receiveTimeout = d
ctx.ensureExtras()
ctx.extras.stopReceiveTimeoutTimer()
if d > 0 {
if ctx.extras.receiveTimeoutTimer == nil {
ctx.extras.initReceiveTimeoutTimer(time.AfterFunc(d, ctx.receiveTimeoutHandler))
} else {
ctx.extras.resetReceiveTimeoutTimer(d)
}
}
}
func (ctx *actorContext) CancelReceiveTimeout() {
if ctx.extras == nil || ctx.extras.receiveTimeoutTimer == nil {
return
}
ctx.extras.killReceiveTimeoutTimer()
ctx.receiveTimeout = 0
}
func (ctx *actorContext) receiveTimeoutHandler() {
if ctx.extras != nil && ctx.extras.receiveTimeoutTimer != nil {
ctx.CancelReceiveTimeout()
ctx.Send(ctx.self, receiveTimeoutMessage)
}
}
func (ctx *actorContext) Forward(pid *PID) {
if msg, ok := ctx.messageOrEnvelope.(SystemMessage); ok {
// SystemMessage cannot be forwarded
ctx.Logger().Error("SystemMessage cannot be forwarded", slog.Any("message", msg))
return
}
ctx.sendUserMessage(pid, ctx.messageOrEnvelope)
}
func (ctx *actorContext) ReenterAfter(f *Future, cont func(res interface{}, err error)) {
wrapper := func() {
cont(f.result, f.err)
}
message := ctx.messageOrEnvelope
// invoke the callback when the future completes
f.continueWith(func(res interface{}, err error) {
// send the wrapped callback as a continuation message to self
ctx.self.sendSystemMessage(ctx.actorSystem, &continuation{
f: wrapper,
message: message,
})
})
}
//
// Interface: sender
//
func (ctx *actorContext) Message() interface{} {
return UnwrapEnvelopeMessage(ctx.messageOrEnvelope)
}
func (ctx *actorContext) MessageHeader() ReadonlyMessageHeader {
return UnwrapEnvelopeHeader(ctx.messageOrEnvelope)
}
func (ctx *actorContext) Send(pid *PID, message interface{}) {
ctx.sendUserMessage(pid, message)
}
func (ctx *actorContext) sendUserMessage(pid *PID, message interface{}) {
if ctx.props.senderMiddlewareChain != nil {
ctx.props.senderMiddlewareChain(ctx.ensureExtras().context, pid, WrapEnvelope(message))
} else {
pid.sendUserMessage(ctx.actorSystem, message)
}
}
func (ctx *actorContext) Request(pid *PID, message interface{}) {
env := &MessageEnvelope{
Header: nil,
Message: message,
Sender: ctx.Self(),
}
ctx.sendUserMessage(pid, env)
}
func (ctx *actorContext) RequestWithCustomSender(pid *PID, message interface{}, sender *PID) {
env := &MessageEnvelope{
Header: nil,
Message: message,
Sender: sender,
}
ctx.sendUserMessage(pid, env)
}
func (ctx *actorContext) RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future {
future := NewFuture(ctx.actorSystem, timeout)
env := &MessageEnvelope{
Header: nil,
Message: message,
Sender: future.PID(),
}
ctx.sendUserMessage(pid, env)
return future
}
//
// Interface: receiver
//
func (ctx *actorContext) Receive(envelope *MessageEnvelope) {
ctx.messageOrEnvelope = envelope
ctx.defaultReceive()
ctx.messageOrEnvelope = nil
}
func (ctx *actorContext) defaultReceive() {
switch msg := ctx.Message().(type) {
case *PoisonPill:
ctx.Stop(ctx.self)
case AutoRespond:
if ctx.props.contextDecoratorChain != nil {
ctx.actor.Receive(ctx.ensureExtras().context)
} else {
ctx.actor.Receive(ctx)
}
res := msg.GetAutoResponse(ctx)
ctx.Respond(res)
default:
// are we using decorators, if so, ensure it has been created
if ctx.props.contextDecoratorChain != nil {
ctx.actor.Receive(ctx.ensureExtras().context)
return
}
ctx.actor.Receive(ctx)
}
}
//
// Interface: spawner
//
func (ctx *actorContext) Spawn(props *Props) *PID {
pid, err := ctx.SpawnNamed(props, ctx.actorSystem.ProcessRegistry.NextId())
if err != nil {
panic(err)
}
return pid
}
func (ctx *actorContext) SpawnPrefix(props *Props, prefix string) *PID {
pid, err := ctx.SpawnNamed(props, prefix+ctx.actorSystem.ProcessRegistry.NextId())
if err != nil {
panic(err)
}
return pid
}
func (ctx *actorContext) SpawnNamed(props *Props, name string) (*PID, error) {
if props.guardianStrategy != nil {
panic(errors.New("props used to spawn child cannot have GuardianStrategy"))
}
var pid *PID
var err error
if ctx.props.spawnMiddlewareChain != nil {
pid, err = ctx.props.spawnMiddlewareChain(ctx.actorSystem, ctx.self.Id+"/"+name, props, ctx)
} else {
pid, err = props.spawn(ctx.actorSystem, ctx.self.Id+"/"+name, ctx)
}
if err != nil {
return pid, err
}
ctx.ensureExtras().addChild(pid)
return pid, nil
}
//
// Interface: stopper
//
// Stop will stop actor immediately regardless of existing user messages in mailbox.
func (ctx *actorContext) Stop(pid *PID) {
if ctx.actorSystem.Config.MetricsProvider != nil {
metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && metricsSystem.enabled {
_ctx := context.Background()
if instruments := metricsSystem.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
instruments.ActorStoppedCount.Add(_ctx, 1, metric.WithAttributes(metricsSystem.CommonLabels(ctx)...))
}
}
}
pid.ref(ctx.actorSystem).Stop(pid)
}
// StopFuture will stop actor immediately regardless of existing user messages in mailbox, and return its future.
func (ctx *actorContext) StopFuture(pid *PID) *Future {
future := NewFuture(ctx.actorSystem, 10*time.Second)
pid.sendSystemMessage(ctx.actorSystem, &Watch{Watcher: future.pid})
ctx.Stop(pid)
return future
}
// Poison will tell actor to stop after processing current user messages in mailbox.
func (ctx *actorContext) Poison(pid *PID) {
pid.sendUserMessage(ctx.actorSystem, poisonPillMessage)
}
// PoisonFuture will tell actor to stop after processing current user messages in mailbox, and return its future.
func (ctx *actorContext) PoisonFuture(pid *PID) *Future {
future := NewFuture(ctx.actorSystem, 10*time.Second)
pid.sendSystemMessage(ctx.actorSystem, &Watch{Watcher: future.pid})
ctx.Poison(pid)
return future
}
//
// Interface: MessageInvoker
//
func (ctx *actorContext) InvokeUserMessage(md interface{}) {
if atomic.LoadInt32(&ctx.state) == stateStopped {
// already stopped
return
}
influenceTimeout := true
if ctx.receiveTimeout > 0 {
_, influenceTimeout = md.(NotInfluenceReceiveTimeout)
influenceTimeout = !influenceTimeout
if influenceTimeout {
ctx.extras.stopReceiveTimeoutTimer()
}
}
systemMetrics, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && systemMetrics.enabled {
t := time.Now()
ctx.processMessage(md)
delta := time.Since(t)
_ctx := context.Background()
if instruments := systemMetrics.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
histogram := instruments.ActorMessageReceiveHistogram
labels := append(
systemMetrics.CommonLabels(ctx),
attribute.String("messagetype", fmt.Sprintf("%T", md)),
)
histogram.Record(_ctx, delta.Seconds(), metric.WithAttributes(labels...))
}
} else {
ctx.processMessage(md)
}
if ctx.receiveTimeout > 0 && influenceTimeout {
ctx.extras.resetReceiveTimeoutTimer(ctx.receiveTimeout)
}
}
func (ctx *actorContext) processMessage(m interface{}) {
if ctx.props.receiverMiddlewareChain != nil {
ctx.props.receiverMiddlewareChain(ctx.ensureExtras().context, WrapEnvelope(m))
return
}
if ctx.props.contextDecoratorChain != nil {
ctx.ensureExtras().context.Receive(WrapEnvelope(m))
return
}
ctx.messageOrEnvelope = m
ctx.defaultReceive()
ctx.messageOrEnvelope = nil // release message
}
func (ctx *actorContext) incarnateActor() {
atomic.StoreInt32(&ctx.state, stateAlive)
ctx.actor = ctx.props.producer(ctx.actorSystem)
metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && metricsSystem.enabled {
_ctx := context.Background()
if instruments := metricsSystem.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
instruments.ActorSpawnCount.Add(_ctx, 1, metric.WithAttributes(metricsSystem.CommonLabels(ctx)...))
}
}
}
func (ctx *actorContext) InvokeSystemMessage(message interface{}) {
//goland:noinspection GrazieInspection
switch msg := message.(type) {
case *continuation:
ctx.messageOrEnvelope = msg.message // apply the message that was present when we started the await
msg.f() // invoke the continuation in the current actor context
ctx.messageOrEnvelope = nil // release the message
case *Started:
ctx.InvokeUserMessage(msg) // forward
case *Watch:
ctx.handleWatch(msg)
case *Unwatch:
ctx.handleUnwatch(msg)
case *Stop:
ctx.handleStop()
case *Terminated:
ctx.handleTerminated(msg)
case *Failure:
ctx.handleFailure(msg)
case *Restart:
ctx.handleRestart()
default:
ctx.Logger().Error("unknown system message", slog.Any("message", msg))
}
}
func (ctx *actorContext) handleRootFailure(failure *Failure) {
defaultSupervisionStrategy.HandleFailure(ctx.actorSystem, ctx, failure.Who, failure.RestartStats, failure.Reason, failure.Message)
}
func (ctx *actorContext) handleWatch(msg *Watch) {
if atomic.LoadInt32(&ctx.state) >= stateStopping {
msg.Watcher.sendSystemMessage(ctx.actorSystem, &Terminated{
Who: ctx.self,
})
} else {
ctx.ensureExtras().watch(msg.Watcher)
}
}
func (ctx *actorContext) handleUnwatch(msg *Unwatch) {
if ctx.extras == nil {
return
}
ctx.extras.unwatch(msg.Watcher)
}
func (ctx *actorContext) handleRestart() {
atomic.StoreInt32(&ctx.state, stateRestarting)
ctx.InvokeUserMessage(restartingMessage)
ctx.stopAllChildren()
ctx.tryRestartOrTerminate()
metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && metricsSystem.enabled {
_ctx := context.Background()
if instruments := metricsSystem.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
instruments.ActorRestartedCount.Add(_ctx, 1, metric.WithAttributes(metricsSystem.CommonLabels(ctx)...))
}
}
}
// I am stopping.
func (ctx *actorContext) handleStop() {
if atomic.LoadInt32(&ctx.state) >= stateStopping {
// already stopping or stopped
return
}
atomic.StoreInt32(&ctx.state, stateStopping)
ctx.InvokeUserMessage(stoppingMessage)
ctx.stopAllChildren()
ctx.tryRestartOrTerminate()
}
// child stopped, check if we can stop or restart (if needed).
func (ctx *actorContext) handleTerminated(msg *Terminated) {
if ctx.extras != nil {
ctx.extras.removeChild(msg.Who)
}
ctx.InvokeUserMessage(msg)
ctx.tryRestartOrTerminate()
}
// offload the supervision completely to the supervisor strategy.
func (ctx *actorContext) handleFailure(msg *Failure) {
if strategy, ok := ctx.actor.(SupervisorStrategy); ok {
strategy.HandleFailure(ctx.actorSystem, ctx, msg.Who, msg.RestartStats, msg.Reason, msg.Message)
return
}
ctx.props.getSupervisor().HandleFailure(ctx.actorSystem, ctx, msg.Who, msg.RestartStats, msg.Reason, msg.Message)
}
func (ctx *actorContext) stopAllChildren() {
if ctx.extras == nil {
return
}
pids := ctx.extras.children.pids
for i := len(pids) - 1; i >= 0; i-- {
pids[i].sendSystemMessage(ctx.actorSystem, stopMessage)
}
}
func (ctx *actorContext) tryRestartOrTerminate() {
if ctx.extras != nil && !ctx.extras.children.Empty() {
return
}
switch atomic.LoadInt32(&ctx.state) {
case stateRestarting:
ctx.CancelReceiveTimeout()
ctx.restart()
case stateStopping:
ctx.CancelReceiveTimeout()
ctx.finalizeStop()
}
}
func (ctx *actorContext) restart() {
ctx.incarnateActor()
ctx.self.sendSystemMessage(ctx.actorSystem, resumeMailboxMessage)
ctx.InvokeUserMessage(startedMessage)
if ctx.extras != nil && ctx.extras.stash != nil {
for !ctx.extras.stash.Empty() {
msg, _ := ctx.extras.stash.Pop()
ctx.InvokeUserMessage(msg)
}
}
}
func (ctx *actorContext) finalizeStop() {
ctx.actorSystem.ProcessRegistry.Remove(ctx.self)
ctx.InvokeUserMessage(stoppedMessage)
otherStopped := &Terminated{Who: ctx.self}
// Notify watchers
if ctx.extras != nil {
ctx.extras.watchers.ForEach(func(i int, pid *PID) {
pid.sendSystemMessage(ctx.actorSystem, otherStopped)
})
}
// Notify parent
if ctx.parent != nil {
ctx.parent.sendSystemMessage(ctx.actorSystem, otherStopped)
}
atomic.StoreInt32(&ctx.state, stateStopped)
}
//
// Interface: Supervisor
//
func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) {
ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
// debug setting, allows to output supervision failures in console/error level
if ctx.actorSystem.Config.DeveloperSupervisionLogging {
fmt.Printf("debug.Stack(): %s\n", debug.Stack())
fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason)
ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
}
metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && metricsSystem.enabled {
_ctx := context.Background()
if instruments := metricsSystem.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
instruments.ActorFailureCount.Add(_ctx, 1, metric.WithAttributes(metricsSystem.CommonLabels(ctx)...))
}
}
failure := &Failure{Reason: reason, Who: ctx.self, RestartStats: ctx.ensureExtras().restartStats(), Message: message}
ctx.self.sendSystemMessage(ctx.actorSystem, suspendMailboxMessage)
if ctx.parent == nil {
ctx.handleRootFailure(failure)
} else {
ctx.parent.sendSystemMessage(ctx.actorSystem, failure)
}
}
func (ctx *actorContext) RestartChildren(pids ...*PID) {
for _, pid := range pids {
pid.sendSystemMessage(ctx.actorSystem, restartMessage)
}
}
func (ctx *actorContext) StopChildren(pids ...*PID) {
for _, pid := range pids {
pid.sendSystemMessage(ctx.actorSystem, stopMessage)
}
}
func (ctx *actorContext) ResumeChildren(pids ...*PID) {
for _, pid := range pids {
pid.sendSystemMessage(ctx.actorSystem, resumeMailboxMessage)
}
}
//
// Miscellaneous
//
func (ctx *actorContext) GoString() string {
return ctx.self.String()
}
func (ctx *actorContext) String() string {
return ctx.self.String()
}
func (ctx *actorContext) Get(id ctxext.ContextExtensionID) ctxext.ContextExtension {
extras := ctx.ensureExtras()
ext := extras.extensions.Get(id)
return ext
}
func (ctx *actorContext) Set(ext ctxext.ContextExtension) {
extras := ctx.ensureExtras()
extras.extensions.Set(ext)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wujianhai/protoactor-go.git
git@gitee.com:wujianhai/protoactor-go.git
wujianhai
protoactor-go
protoactor-go
5633fe2499dd

搜索帮助