1 Star 1 Fork 0

D10.天地弦 / gobase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
pubhub.go 2.21 KB
一键复制 编辑 原始数据 按行查看 历史
D10.天地弦 提交于 2024-01-18 11:05 . * gobase 包归类整理
package subpub
import (
"fmt"
"gitee.com/ymofen/gobase/factory"
"gitee.com/ymofen/gobase"
"sync"
)
// 创建的Session对象必须要实现的接口方法
type PubSessionIntf interface {
Start()
Close() error
// 发布数据
Pub(topic string, max int, args ...interface{}) int
}
// 发布中心
// 负责创建管理会话对象
// 注册的方法必须是pub.开头
type PubHub struct {
notify ISubPub
lk sync.RWMutex
lstMap map[string]interface{}
}
func NewPubHub() *PubHub {
rval := &PubHub{
lstMap: make(map[string]interface{}),
notify: NewSubchannel(),
}
return rval
}
var (
DefaultPub = NewPubHub()
)
// 使用conntype参数, 调用pub.[xxx]注册的方法, 创建一个Pub会话对象
//
// false: 如果sessionid已经存在; conntype参数未注册
func (this *PubHub) AddSession(sessionid string, conf gobase.StrMap, notifyFn SubFunc) (pub PubSessionIntf, err error) {
this.lk.RLock()
obj := this.lstMap[sessionid]
this.lk.RUnlock()
if obj != nil {
return nil, fmt.Errorf("[%s]已经存在", sessionid)
}
typestr := conf.StringByName("conntype", "")
if len(typestr) == 0 {
return nil, fmt.Errorf("未指定conntype")
} else {
typestr = fmt.Sprintf("pub.%s", typestr)
}
obj, err = factory.CreateInstance(typestr, sessionid, conf, this.notify)
if err != nil {
return nil, err
}
intf, ok := obj.(PubSessionIntf)
if !ok {
return nil, fmt.Errorf("[%s]类型插件不支持PubSessionIntf接口", typestr)
}
this.lk.Lock()
this.lstMap[sessionid] = obj
this.lk.Unlock()
this.notify.Sub(sessionid, sessionid, notifyFn)
intf.Start()
return intf, nil
}
// 关闭移除一个会话
func (this *PubHub) DelSession(sessionid string) bool {
this.lk.RLock()
obj := this.lstMap[sessionid]
this.lk.RUnlock()
if obj != nil {
if intf, ok := obj.(PubSessionIntf); ok {
intf.Close()
}
this.lk.Lock()
delete(this.lstMap, sessionid)
this.lk.Unlock()
}
return this.notify.Unsub(sessionid, sessionid)
}
// 发布数据
func (this *PubHub) Pub(sessionid string, topic string, max int, args ...interface{}) int {
this.lk.RLock()
obj := this.lstMap[sessionid]
this.lk.RUnlock()
if obj != nil {
if intf, ok := obj.(PubSessionIntf); ok {
return intf.Pub(topic, max, args...)
}
}
return 0
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ymofen/gobase.git
git@gitee.com:ymofen/gobase.git
ymofen
gobase
gobase
v1.2.24053

搜索帮助

344bd9b3 5694891 D2dac590 5694891