代码拉取完成,页面将自动刷新
// Package eventbus 事件总线(泛型版)
package eventbus
import (
"reflect"
"sync"
)
var defEventBus = NewEventbus()
func DefaultEventBus() *Eventbus {
return defEventBus
}
// NewEventbus 创建事件总线
func NewEventbus() *Eventbus {
return &Eventbus{
events: map[reflect.Type]event{},
}
}
// Eventbus 事件总线,为业务提供通用的事件订阅、发布机制,降低业务模块间的耦合
type Eventbus struct {
mutex sync.RWMutex
events map[reflect.Type]event // 已注册事件列表
}
// Subscribe 订阅事件
//
// 事件响应时,不带错误返回的函数全部会被调用,带错误返回的函数执行到第一个出错函数结束,并返回第一个出错函数的返回值
func Subscribe[T any](f func(arg T), ebs ...*Eventbus) error {
if len(ebs) == 0 {
return eventbusSubscribe(DefaultEventBus(), f)
}
for _, eb := range ebs {
if err := eventbusSubscribe(eb, f); err != nil {
return err
}
}
return nil
}
func eventbusSubscribe[T any](eb *Eventbus, f func(arg T)) error {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
if !ok {
es = &eventT[T]{}
eb.events[vt] = es
}
eb.mutex.Unlock()
return es.subscribe(f)
}
// SubscribeWithErr 订阅事件
//
// 事件响应时,不带错误返回的函数全部会被调用,带错误返回的函数执行到第一个出错函数结束,并返回第一个出错函数的返回值
func SubscribeWithErr[T any](f func(arg T) error, ebs ...*Eventbus) error {
if len(ebs) == 0 {
return eventbusSubscribeWithErr(DefaultEventBus(), f)
}
for _, eb := range ebs {
if err := eventbusSubscribeWithErr(eb, f); err != nil {
return err
}
}
return nil
}
func eventbusSubscribeWithErr[T any](eb *Eventbus, f func(arg T) error) error {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
if !ok {
es = &eventT[T]{}
eb.events[vt] = es
}
eb.mutex.Unlock()
return es.subscribe(f)
}
// UnSubscribe 取消订阅事件
func UnSubscribe[T any](f func(arg T), ebs ...*Eventbus) {
if len(ebs) == 0 {
eventbusUnSubscribe(DefaultEventBus(), f)
return
}
for _, eb := range ebs {
eventbusUnSubscribe(eb, f)
}
}
func eventbusUnSubscribe[T any](eb *Eventbus, f func(arg T)) {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
eb.mutex.Unlock()
if ok {
es.unsubscribe(f)
}
}
// UnSubscribeWithErr 取消订阅事件
func UnSubscribeWithErr[T any](f func(arg T) error, ebs ...*Eventbus) {
if len(ebs) == 0 {
eventbusUnSubscribeWithErr(DefaultEventBus(), f)
return
}
for _, eb := range ebs {
eventbusUnSubscribeWithErr(eb, f)
}
}
func eventbusUnSubscribeWithErr[T any](eb *Eventbus, f func(arg T) error) {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
defer eb.mutex.Unlock()
if ok {
es.unsubscribe(f)
}
}
// UnSubscribeType 取消指定类型的所有订阅
func UnSubscribeType[T any](ebs ...*Eventbus) {
if len(ebs) == 0 {
eventbusUnSubscribeType[T](DefaultEventBus())
return
}
for _, eb := range ebs {
eventbusUnSubscribeType[T](eb)
}
}
func eventbusUnSubscribeType[T any](eb *Eventbus) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
vt := reflect.TypeOf((*T)(nil)).Elem()
delete(eb.events, vt)
}
// Publish 发布事件
func Publish[T any](arg T, ebs ...*Eventbus) error {
if len(ebs) == 0 {
return eventbusPublish(DefaultEventBus(), arg)
}
for _, eb := range ebs {
if err := eventbusPublish(eb, arg); err != nil {
return err
}
}
return nil
}
func eventbusPublish[T any](eb *Eventbus, arg T) error {
vt := reflect.TypeOf(arg)
eb.mutex.RLock()
es, ok := eb.events[vt]
eb.mutex.RUnlock()
if ok {
return es.publish(arg)
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。