1 Star 1 Fork 0

D10.天地弦/gobase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
subchannel.go 4.23 KB
一键复制 编辑 原始数据 按行查看 历史
D10.天地弦 提交于 2024-01-18 19:37 . * gobase 包归类整理
package subpub
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// 发布通道
type subchannelItem struct {
id string
sesslst map[string]SubFunc
idlst []string
fnlst []SubFunc
lastActivityT int64
}
// 订阅中心, 订阅和取消订阅的效率比subscribe快很多
// 订阅主题 Sub消息通道不支持通配符
type Subchannel struct {
channelAliveN int32
lockcnt int32
lk sync.RWMutex
// 通道列表
channellst map[string]*subchannelItem
}
var (
DefaultSubchannel = NewSubchannel()
)
func NewSubchannel() *Subchannel {
return &Subchannel{
channellst: make(map[string]*subchannelItem),
}
}
func (this *Subchannel) Close() error {
this.lk.Lock()
defer this.lk.Unlock()
for _, v := range this.channellst {
//delete(this.channellst, k)
this.innerFreeChannel(v)
}
return nil
}
func (this *Subchannel) checkGetChannel(channel string, new bool) *subchannelItem {
itm := this.channellst[channel]
if new && itm == nil {
itm = &subchannelItem{id: channel, sesslst: make(map[string]SubFunc)}
atomic.AddInt32(&this.channelAliveN, 1)
runtime.SetFinalizer(itm, func(obj interface{}) {
atomic.AddInt32(&this.channelAliveN, -1)
})
this.channellst[channel] = itm
}
return itm
}
func (this *Subchannel) Status() string {
return fmt.Sprintf("subtopic-n:%d, channel:%d, alive:%d", this.GetChannelCount(), len(this.channellst), this.channelAliveN)
}
func (this *Subchannel) GetChannelSessionCount(channel string) int {
this.lk.RLock()
defer this.lk.RUnlock()
itm := this.checkGetChannel(channel, false)
if itm == nil {
return 0
}
return len(itm.sesslst)
}
func (this *Subchannel) GetChannelCount() int {
this.lk.RLock()
defer this.lk.RUnlock()
return len(this.channellst)
}
func (this *Subchannel) innerReloadSubSessionFnlst(itm *subchannelItem) {
fnlst := make([]SubFunc, len(itm.sesslst))
idlst := make([]string, len(itm.sesslst))
i := 0
for id, fn := range itm.sesslst { // 所有session都添加进去
fnlst[i] = fn
idlst[i] = id
i++
}
itm.fnlst, itm.idlst = fnlst, idlst
}
// channel不能为空
func (this *Subchannel) Sub(id, channel string, cb SubFunc) {
if len(channel) == 0 {
return
}
this.lk.Lock()
defer this.lk.Unlock()
itm := this.checkGetChannel(channel, true)
itm.sesslst[id] = cb
this.innerReloadSubSessionFnlst(itm)
return
}
func (this *Subchannel) Unsub(id, channel string) bool {
if len(channel) == 0 {
return false
}
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.checkGetChannel(channel, false)
if itm != nil {
delete(itm.sesslst, id)
if len(itm.sesslst) == 0 {
this.innerFreeChannel(itm)
} else {
this.innerReloadSubSessionFnlst(itm)
}
return true
}
return false
}
// 0:发布数据失败
func (this *Subchannel) Pub(channel string, max int, args ...interface{}) int {
n := 0
var fnlst []SubFunc
var idlst []string
this.lk.RLock()
itm := this.channellst[channel]
if itm != nil {
fnlst, idlst = itm.fnlst, itm.idlst
itm.lastActivityT = time.Now().Unix()
}
this.lk.RUnlock()
if itm == nil {
return 0
}
if len(idlst) != len(fnlst) {
return -1
}
for idx, fn := range fnlst {
if fn(idlst[idx], channel, args...) {
n++
if max > 0 && n >= max {
break
}
}
}
return n
}
func (this *Subchannel) innerFreeChannel(itm *subchannelItem) bool {
if itm == nil || len(itm.sesslst) > 0 {
return false
}
delete(this.channellst, itm.id)
itm.idlst = nil
itm.sesslst = nil
itm.fnlst = nil
itm.idlst = nil
return true
}
// 释放通道
func (this *Subchannel) ClosePubChannel(channel string) (closed bool) {
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.channellst[channel]
return this.innerFreeChannel(itm)
}
// 清理一些超时10分钟没有发布消息的通道
func (this *Subchannel) CleanChannels() (cnt int) {
t := time.Now().Unix()
var lst []*subchannelItem
this.lk.RLock()
for _, itm := range this.channellst {
if t-itm.lastActivityT > 600 { // 10分钟没有发布数据, 进行清理
lst = append(lst, itm)
}
}
lst = append(lst)
this.lk.RUnlock()
if len(lst) > 0 {
this.lk.Lock()
defer this.lk.Unlock()
for i := 0; i < len(lst); i++ {
if this.innerFreeChannel(lst[i]) {
cnt++
}
}
}
return
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ymofen/gobase.git
git@gitee.com:ymofen/gobase.git
ymofen
gobase
gobase
v1.2.24053

搜索帮助

D67c1975 1850385 1daf7b77 1850385