代码拉取完成,页面将自动刷新
package subpub
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// 发布通道
type subchannelItem struct {
id string
sesslst map[string]SubFunc
idlst []string
fnlst []SubFunc
lastActivityT int64
}
// 订阅中心, 订阅和取消订阅的效率比subscribe快很多
// 订阅主题 Sub消息通道不支持通配符
type Subchannel struct {
channelAliveN int32
lockcnt int32
lk sync.RWMutex
// 通道列表
channellst map[string]*subchannelItem
}
var (
DefaultSubchannel = NewSubchannel()
)
func NewSubchannel() *Subchannel {
return &Subchannel{
channellst: make(map[string]*subchannelItem),
}
}
func (this *Subchannel) Close() error {
this.lk.Lock()
defer this.lk.Unlock()
for _, v := range this.channellst {
//delete(this.channellst, k)
this.innerFreeChannel(v)
}
return nil
}
func (this *Subchannel) checkGetChannel(channel string, new bool) *subchannelItem {
itm := this.channellst[channel]
if new && itm == nil {
itm = &subchannelItem{id: channel, sesslst: make(map[string]SubFunc)}
atomic.AddInt32(&this.channelAliveN, 1)
runtime.SetFinalizer(itm, func(obj interface{}) {
atomic.AddInt32(&this.channelAliveN, -1)
})
this.channellst[channel] = itm
}
return itm
}
func (this *Subchannel) Status() string {
return fmt.Sprintf("subtopic-n:%d, channel:%d, alive:%d", this.GetChannelCount(), len(this.channellst), this.channelAliveN)
}
func (this *Subchannel) GetChannelSessionCount(channel string) int {
this.lk.RLock()
defer this.lk.RUnlock()
itm := this.checkGetChannel(channel, false)
if itm == nil {
return 0
}
return len(itm.sesslst)
}
func (this *Subchannel) GetChannelCount() int {
this.lk.RLock()
defer this.lk.RUnlock()
return len(this.channellst)
}
func (this *Subchannel) innerReloadSubSessionFnlst(itm *subchannelItem) {
fnlst := make([]SubFunc, len(itm.sesslst))
idlst := make([]string, len(itm.sesslst))
i := 0
for id, fn := range itm.sesslst { // 所有session都添加进去
fnlst[i] = fn
idlst[i] = id
i++
}
itm.fnlst, itm.idlst = fnlst, idlst
}
// channel不能为空
func (this *Subchannel) Sub(id, channel string, cb SubFunc) {
if len(channel) == 0 {
return
}
this.lk.Lock()
defer this.lk.Unlock()
itm := this.checkGetChannel(channel, true)
itm.sesslst[id] = cb
this.innerReloadSubSessionFnlst(itm)
return
}
func (this *Subchannel) Unsub(id, channel string) bool {
if len(channel) == 0 {
return false
}
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.checkGetChannel(channel, false)
if itm != nil {
delete(itm.sesslst, id)
if len(itm.sesslst) == 0 {
this.innerFreeChannel(itm)
} else {
this.innerReloadSubSessionFnlst(itm)
}
return true
}
return false
}
// 0:发布数据失败
func (this *Subchannel) Pub(channel string, max int, args ...interface{}) int {
n := 0
var fnlst []SubFunc
var idlst []string
this.lk.RLock()
itm := this.channellst[channel]
if itm != nil {
fnlst, idlst = itm.fnlst, itm.idlst
itm.lastActivityT = time.Now().Unix()
}
this.lk.RUnlock()
if itm == nil {
return 0
}
if len(idlst) != len(fnlst) {
return -1
}
for idx, fn := range fnlst {
if fn(idlst[idx], channel, args...) {
n++
if max > 0 && n >= max {
break
}
}
}
return n
}
func (this *Subchannel) innerFreeChannel(itm *subchannelItem) bool {
if itm == nil || len(itm.sesslst) > 0 {
return false
}
delete(this.channellst, itm.id)
itm.idlst = nil
itm.sesslst = nil
itm.fnlst = nil
itm.idlst = nil
return true
}
// 释放通道
func (this *Subchannel) ClosePubChannel(channel string) (closed bool) {
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.channellst[channel]
return this.innerFreeChannel(itm)
}
// 清理一些超时10分钟没有发布消息的通道
func (this *Subchannel) CleanChannels() (cnt int) {
t := time.Now().Unix()
var lst []*subchannelItem
this.lk.RLock()
for _, itm := range this.channellst {
if t-itm.lastActivityT > 600 { // 10分钟没有发布数据, 进行清理
lst = append(lst, itm)
}
}
lst = append(lst)
this.lk.RUnlock()
if len(lst) > 0 {
this.lk.Lock()
defer this.lk.Unlock()
for i := 0; i < len(lst); i++ {
if this.innerFreeChannel(lst[i]) {
cnt++
}
}
}
return
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。