1 Star 0 Fork 0

jack/protoactor-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mailbox.go 4.24 KB
一键复制 编辑 原始数据 按行查看 历史
490689386@qq.com 提交于 2025-05-19 14:50 +08:00 . 初始化
package actor
import (
"runtime"
"sync/atomic"
"gitee.com/wujianhai/protoactor-go/internal/queue/mpsc"
)
// MailboxMiddleware is an interface for intercepting messages and events in the mailbox
type MailboxMiddleware interface {
MailboxStarted()
MessagePosted(message interface{})
MessageReceived(message interface{})
MailboxEmpty()
}
// MessageInvoker is the interface used by a mailbox to forward messages for processing
type MessageInvoker interface {
InvokeSystemMessage(interface{})
InvokeUserMessage(interface{})
EscalateFailure(reason interface{}, message interface{})
}
// Mailbox interface is used to enqueue messages to the mailbox
type Mailbox interface {
PostUserMessage(message interface{})
PostSystemMessage(message interface{})
RegisterHandlers(invoker MessageInvoker, dispatcher Dispatcher)
Start()
UserMessageCount() int
}
// MailboxProducer is a function which creates a new mailbox
type MailboxProducer func() Mailbox
const (
idle int32 = iota
running
)
type defaultMailbox struct {
userMailbox queue
systemMailbox *mpsc.Queue
schedulerStatus int32
userMessages int32
sysMessages int32
suspended int32
invoker MessageInvoker
dispatcher Dispatcher
middlewares []MailboxMiddleware
}
func (m *defaultMailbox) PostUserMessage(message interface{}) {
// is it a raw batch message?
if batch, ok := message.(MessageBatch); ok {
messages := batch.GetMessages()
for _, msg := range messages {
m.PostUserMessage(msg)
}
}
if env, ok := message.(*MessageEnvelope); ok {
if batch, ok := env.Message.(MessageBatch); ok {
messages := batch.GetMessages()
for _, msg := range messages {
m.PostUserMessage(msg)
}
}
}
// normal messages
for _, ms := range m.middlewares {
ms.MessagePosted(message)
}
m.userMailbox.Push(message)
atomic.AddInt32(&m.userMessages, 1)
m.schedule()
}
func (m *defaultMailbox) PostSystemMessage(message interface{}) {
for _, ms := range m.middlewares {
ms.MessagePosted(message)
}
m.systemMailbox.Push(message)
atomic.AddInt32(&m.sysMessages, 1)
m.schedule()
}
func (m *defaultMailbox) RegisterHandlers(invoker MessageInvoker, dispatcher Dispatcher) {
m.invoker = invoker
m.dispatcher = dispatcher
}
func (m *defaultMailbox) schedule() {
if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
m.dispatcher.Schedule(m.processMessages)
}
}
func (m *defaultMailbox) processMessages() {
process:
m.run()
// set mailbox to idle
atomic.StoreInt32(&m.schedulerStatus, idle)
sys := atomic.LoadInt32(&m.sysMessages)
user := atomic.LoadInt32(&m.userMessages)
// check if there are still messages to process (sent after the message loop ended)
if sys > 0 || (atomic.LoadInt32(&m.suspended) == 0 && user > 0) {
// try setting the mailbox back to running
if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
// fmt.Printf("looping %v %v %v\n", sys, user, m.suspended)
goto process
}
}
if user == 0 && (atomic.LoadInt32(&m.suspended) == 0) {
for _, ms := range m.middlewares {
ms.MailboxEmpty()
}
}
}
func (m *defaultMailbox) run() {
var msg interface{}
defer func() {
if r := recover(); r != nil {
m.invoker.EscalateFailure(r, msg)
}
}()
i, t := 0, m.dispatcher.Throughput()
for {
if i > t {
i = 0
runtime.Gosched()
}
i++
// keep processing system messages until queue is empty
if msg = m.systemMailbox.Pop(); msg != nil {
atomic.AddInt32(&m.sysMessages, -1)
switch msg.(type) {
case *SuspendMailbox:
atomic.StoreInt32(&m.suspended, 1)
case *ResumeMailbox:
atomic.StoreInt32(&m.suspended, 0)
default:
m.invoker.InvokeSystemMessage(msg)
}
for _, ms := range m.middlewares {
ms.MessageReceived(msg)
}
continue
}
// didn't process a system message, so break until we are resumed
if atomic.LoadInt32(&m.suspended) == 1 {
return
}
if msg = m.userMailbox.Pop(); msg != nil {
atomic.AddInt32(&m.userMessages, -1)
m.invoker.InvokeUserMessage(msg)
for _, ms := range m.middlewares {
ms.MessageReceived(msg)
}
} else {
return
}
}
}
func (m *defaultMailbox) Start() {
for _, ms := range m.middlewares {
ms.MailboxStarted()
}
}
func (m *defaultMailbox) UserMessageCount() int {
return int(atomic.LoadInt32(&m.userMessages))
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wujianhai/protoactor-go.git
git@gitee.com:wujianhai/protoactor-go.git
wujianhai
protoactor-go
protoactor-go
5633fe2499dd

搜索帮助