1 Star 1 Fork 0

carlzyhuang/framework

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
eventbus.go 3.68 KB
一键复制 编辑 原始数据 按行查看 历史
carlzyhuang 提交于 2025-10-15 16:16 +08:00 . 框架代码初始化
// Package eventbus 事件总线(泛型版)
package eventbus
import (
"reflect"
"sync"
)
var defEventBus = NewEventbus()
func DefaultEventBus() *Eventbus {
return defEventBus
}
// NewEventbus 创建事件总线
func NewEventbus() *Eventbus {
return &Eventbus{
events: map[reflect.Type]event{},
}
}
// Eventbus 事件总线,为业务提供通用的事件订阅、发布机制,降低业务模块间的耦合
type Eventbus struct {
mutex sync.RWMutex
events map[reflect.Type]event // 已注册事件列表
}
// Subscribe 订阅事件
//
// 事件响应时,不带错误返回的函数全部会被调用,带错误返回的函数执行到第一个出错函数结束,并返回第一个出错函数的返回值
func Subscribe[T any](f func(arg T), ebs ...*Eventbus) error {
if len(ebs) == 0 {
return eventbusSubscribe(DefaultEventBus(), f)
}
for _, eb := range ebs {
if err := eventbusSubscribe(eb, f); err != nil {
return err
}
}
return nil
}
func eventbusSubscribe[T any](eb *Eventbus, f func(arg T)) error {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
if !ok {
es = &eventT[T]{}
eb.events[vt] = es
}
eb.mutex.Unlock()
return es.subscribe(f)
}
// SubscribeWithErr 订阅事件
//
// 事件响应时,不带错误返回的函数全部会被调用,带错误返回的函数执行到第一个出错函数结束,并返回第一个出错函数的返回值
func SubscribeWithErr[T any](f func(arg T) error, ebs ...*Eventbus) error {
if len(ebs) == 0 {
return eventbusSubscribeWithErr(DefaultEventBus(), f)
}
for _, eb := range ebs {
if err := eventbusSubscribeWithErr(eb, f); err != nil {
return err
}
}
return nil
}
func eventbusSubscribeWithErr[T any](eb *Eventbus, f func(arg T) error) error {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
if !ok {
es = &eventT[T]{}
eb.events[vt] = es
}
eb.mutex.Unlock()
return es.subscribe(f)
}
// UnSubscribe 取消订阅事件
func UnSubscribe[T any](f func(arg T), ebs ...*Eventbus) {
if len(ebs) == 0 {
eventbusUnSubscribe(DefaultEventBus(), f)
return
}
for _, eb := range ebs {
eventbusUnSubscribe(eb, f)
}
}
func eventbusUnSubscribe[T any](eb *Eventbus, f func(arg T)) {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
eb.mutex.Unlock()
if ok {
es.unsubscribe(f)
}
}
// UnSubscribeWithErr 取消订阅事件
func UnSubscribeWithErr[T any](f func(arg T) error, ebs ...*Eventbus) {
if len(ebs) == 0 {
eventbusUnSubscribeWithErr(DefaultEventBus(), f)
return
}
for _, eb := range ebs {
eventbusUnSubscribeWithErr(eb, f)
}
}
func eventbusUnSubscribeWithErr[T any](eb *Eventbus, f func(arg T) error) {
vt := reflect.TypeOf((*T)(nil)).Elem()
eb.mutex.Lock()
es, ok := eb.events[vt]
defer eb.mutex.Unlock()
if ok {
es.unsubscribe(f)
}
}
// UnSubscribeType 取消指定类型的所有订阅
func UnSubscribeType[T any](ebs ...*Eventbus) {
if len(ebs) == 0 {
eventbusUnSubscribeType[T](DefaultEventBus())
return
}
for _, eb := range ebs {
eventbusUnSubscribeType[T](eb)
}
}
func eventbusUnSubscribeType[T any](eb *Eventbus) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
vt := reflect.TypeOf((*T)(nil)).Elem()
delete(eb.events, vt)
}
// Publish 发布事件
func Publish[T any](arg T, ebs ...*Eventbus) error {
if len(ebs) == 0 {
return eventbusPublish(DefaultEventBus(), arg)
}
for _, eb := range ebs {
if err := eventbusPublish(eb, arg); err != nil {
return err
}
}
return nil
}
func eventbusPublish[T any](eb *Eventbus, arg T) error {
vt := reflect.TypeOf(arg)
eb.mutex.RLock()
es, ok := eb.events[vt]
eb.mutex.RUnlock()
if ok {
return es.publish(arg)
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/carlzyhuang/framework.git
git@gitee.com:carlzyhuang/framework.git
carlzyhuang
framework
framework
v0.0.18

搜索帮助