代码拉取完成,页面将自动刷新
package eventbus
import (
"fmt"
"reflect"
"runtime/debug"
"sync"
"unsafe"
"gitee.com/carlzyhuang/framework/log"
)
type event interface {
subscribe(f interface{}) error
unsubscribe(f interface{})
publish(arg interface{}) error
}
type subscriber struct {
f interface{}
k uint64
}
type eventT[T any] struct {
sync.RWMutex
subscribers []subscriber // 订阅者列表
subscribersWithErr []subscriber // 订阅者列表(带error返回)
}
// 订阅事件
func (e *eventT[T]) subscribe(f interface{}) error {
e.Lock()
defer e.Unlock()
key := uint64(uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&f))[1]))
if _, ok := f.(func(T)); ok {
for _, s := range e.subscribers {
if s.k == key {
return fmt.Errorf("repeat subscribe: %d-%s", key, reflect.TypeOf(f).String())
}
}
e.subscribers = append(e.subscribers, subscriber{f: f, k: key})
} else if _, ok := f.(func(T) error); ok {
for _, s := range e.subscribersWithErr {
if s.k == key {
return fmt.Errorf("repeat subscribe: %d-%s", key, reflect.TypeOf(f).String())
}
}
e.subscribersWithErr = append(e.subscribersWithErr, subscriber{f: f, k: key})
} else {
vt := reflect.TypeOf((*T)(nil)).Elem()
return fmt.Errorf("handler[%s] should be func(arg %s) or func(arg %s) error",
reflect.TypeOf(f).String(), vt.Name(), vt.Name())
}
return nil
}
func (e *eventT[T]) unsubscribe(f interface{}) {
e.Lock()
defer e.Unlock()
key := uint64(uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&f))[1]))
if _, ok := f.(func(T)); ok {
e.subscribers = e.remove(e.subscribers, key)
} else if _, ok := f.(func(T) error); ok {
e.subscribersWithErr = e.remove(e.subscribersWithErr, key)
}
}
func (e *eventT[T]) remove(subscribers []subscriber, key uint64) []subscriber {
for i, s := range subscribers {
if s.k == key {
return append(subscribers[:i], subscribers[i+1:]...)
}
}
return subscribers
}
// 发布事件
func (e *eventT[T]) publish(v interface{}) error {
defer func() {
if err := recover(); err != nil {
log.Errorf("panic: %+v\n%s", err, debug.Stack())
}
}()
arg, ok := v.(T)
if !ok {
return fmt.Errorf("invalid type: %s", reflect.TypeOf(v).String())
}
e.RLock()
defer e.RUnlock()
for _, s := range e.subscribers {
s.f.(func(T))(arg)
}
// 第一个出错就中止
for _, s := range e.subscribersWithErr {
if err := s.f.(func(T) error)(arg); err != nil {
return err
}
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。