1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bus.go 4.22 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-10-24 21:29 . bus
package bus
import (
"fmt"
"reflect"
"sync"
)
var _ Publisher = (*Bus)(nil)
var _ Subscriber = (*Bus)(nil)
type eventHandler struct {
sync.Mutex // lock for an event handler - useful for running async callbacks serially
handle reflect.Value
fOnce bool //运行一次
async bool //异步处理
transactional bool //事务性
}
type Bus struct {
handlers map[string][]*eventHandler
rm sync.RWMutex
wg sync.WaitGroup
}
func New() *Bus {
return &Bus{
handlers: make(map[string][]*eventHandler, 0),
rm: sync.RWMutex{},
wg: sync.WaitGroup{},
}
}
func (bus *Bus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error {
bus.rm.Lock()
defer bus.rm.Unlock()
var kind = reflect.TypeOf(fn).Kind()
switch kind {
case reflect.Func:
case reflect.Chan:
if _, ok := fn.(EventChan); !ok {
return fmt.Errorf("%s is not of type EventChan", kind)
}
default:
return fmt.Errorf("%s is not of type reflect.Func or reflect.Chan", kind)
}
bus.handlers[topic] = append(bus.handlers[topic], handler)
return nil
}
func (bus *Bus) SubscribeEventChan(topic string, ch EventChan) error {
return bus.SubscribeEventChan(topic, ch)
}
func (bus *Bus) Subscribe(topic string, ch interface{}) error {
return bus.doSubscribe(topic, ch, &eventHandler{
sync.Mutex{}, reflect.ValueOf(ch), false, false, false,
})
}
func (bus *Bus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
return bus.doSubscribe(topic, fn, &eventHandler{
sync.Mutex{}, reflect.ValueOf(fn), false, true, transactional,
})
}
// SubscribeOnce subscribes to a topic once. Handler will be removed after executing.
// Returns error if `fn` is not a function.
func (bus *Bus) SubscribeOnce(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
sync.Mutex{}, reflect.ValueOf(fn), true, false, false,
})
}
// SubscribeOnceAsync subscribes to a topic once with an asynchronous callback
// Async determines whether subsequent Publish should wait for callback return
// Handler will be removed after executing.
// Returns error if `fn` is not a function.
func (bus *Bus) SubscribeOnceAsync(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
sync.Mutex{}, reflect.ValueOf(fn), true, true, false,
})
}
func (bus *Bus) UnSubscribe(topic string, ch interface{}) {
bus.rm.Lock()
defer bus.rm.Unlock()
handlers, found := bus.handlers[topic]
if !found || len(handlers) == 0 {
return
}
for i, c := range handlers {
if c.handle == reflect.ValueOf(ch) {
bus.handlers[topic] = append(handlers[:i], handlers[i+1:]...)
return
}
}
}
func (bus *Bus) Start() error {
return nil
}
func (bus *Bus) Stop() {
}
func (bus *Bus) PublishArgs(topic string, args ...interface{}) {
bus.rm.RLock()
defer bus.rm.RUnlock()
hs, found := bus.handlers[topic]
if !found {
return
}
handlers := append([]*eventHandler{}, hs...)
for i, handler := range handlers {
if handler.fOnce {
bus.removeHandler(topic, i)
}
if !handler.async { //同步
bus.doPublish(handler, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
handler.Lock()
}
go bus.doPublishAsync(handler, args...)
}
}
}
func (bus *Bus) Publish(topic string, ev Event) {
bus.PublishArgs(topic, ev)
}
func (bus *Bus) doPublish(eventHandle *eventHandler, args ...interface{}) {
var kind = eventHandle.handle.Kind()
switch kind {
case reflect.Func:
vs := make([]reflect.Value, 0)
for _, arg := range args {
vs = append(vs, reflect.ValueOf(arg))
}
eventHandle.handle.Call(vs)
case reflect.Chan:
eventCh, ex := eventHandle.handle.Interface().(EventChan)
if !ex {
return
}
for i := range args {
if event, ok := args[i].(Event); ok {
eventCh <- event
}
}
default:
}
}
func (bus *Bus) doPublishAsync(handler *eventHandler, args ...interface{}) {
defer bus.wg.Done()
if handler.transactional {
defer handler.Unlock()
}
bus.doPublish(handler, args...)
}
func (bus *Bus) removeHandler(topic string, idx int) {
handlers, ok := bus.handlers[topic]
if !ok {
return
}
l := len(handlers)
if 0 > idx || idx >= l {
return
}
bus.handlers[topic] = append(handlers[:idx], handlers[idx+1:]...)
}
func (bus *Bus) Wait() {
bus.wg.Wait()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.10.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891