代码拉取完成,页面将自动刷新
package router
import (
"sync"
"sync/atomic"
"time"
"gitee.com/lwj8507/light-protoactor-go/actor"
)
// process serves as a proxy to the router implementation and forwards messages directly to the routee. This
// optimization avoids serializing router messages through an actor
type process struct {
router *actor.PID
state Interface
mu sync.Mutex
watchers actor.PIDSet
stopping int32
}
func (ref *process) SendUserMessage(pid *actor.PID, message interface{}, sender *actor.PID) {
if _, ok := message.(ManagementMessage); ok {
r, _ := actor.ProcessRegistry.Get(ref.router)
r.SendUserMessage(pid, message, sender)
} else {
ref.state.RouteMessage(message, sender)
}
}
func (ref *process) SendSystemMessage(pid *actor.PID, message interface{}) {
switch msg := message.(type) {
case *actor.Watch:
if atomic.LoadInt32(&ref.stopping) == 1 {
if r, ok := actor.ProcessRegistry.Get(msg.Watcher); ok {
r.SendSystemMessage(msg.Watcher, &actor.Terminated{Who: pid})
}
return
}
ref.mu.Lock()
ref.watchers.Add(msg.Watcher)
ref.mu.Unlock()
case *actor.Unwatch:
ref.mu.Lock()
ref.watchers.Remove(msg.Watcher)
ref.mu.Unlock()
case *actor.Stop:
term := &actor.Terminated{Who: pid}
ref.mu.Lock()
ref.watchers.ForEach(func(_ int, other actor.PID) {
if r, ok := actor.ProcessRegistry.Get(&other); ok {
r.SendSystemMessage(&other, term)
}
})
ref.mu.Unlock()
default:
r, _ := actor.ProcessRegistry.Get(ref.router)
r.SendSystemMessage(pid, message)
}
}
func (ref *process) Stop(pid *actor.PID) {
if atomic.SwapInt32(&ref.stopping, 1) == 1 {
return
}
actor.StopGraceful(ref.router, 10*time.Second)
actor.ProcessRegistry.Remove(pid)
ref.SendSystemMessage(pid, &actor.Stop{})
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。