1 Star 1 Fork 0

D10.天地弦 / gobase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
subscribe.go 6.86 KB
一键复制 编辑 原始数据 按行查看 历史
D10.天地弦 提交于 2024-01-07 11:16 . * 优化subscribe
package v0
import (
"fmt"
"gitee.com/ymofen/gobase"
"sort"
"strings"
"sync"
)
/*
支持使用路由进行订阅 # 匹配下级所有, +匹配当前层
topic/news
10000 topic
pub:
sync.map
lk:
utils_subscribe_v2._test.go:168: 25505, 5094/s
utils_subscribe_v2._test.go:168: 50409, 5035/s
nolk:
utils_subscribe_v2._test.go:168: 25767, 5135/s
utils_subscribe_v2._test.go:168: 51914, 5175/s
map with lk
utils_subscribe_v2._test.go:168: 34228, 6844/s
utils_subscribe_v2._test.go:168: 68952, 6894/s
map with lk
utils_subscribe_v2._test.go:168: 35343, 6977/s
utils_subscribe_v2._test.go:168: 71194, 7072/s
sesslst -> map
utils_subscribe_v2._test.go:168: 36668, 7321/s
utils_subscribe_v2._test.go:168: 74022, 7383/s
*/
type SubFunc = func(id, topic string, args ...interface{}) (ok bool)
type ISub interface {
Sub(id, topic string, cb func(id, topic string, args ...interface{}) (ok bool))
}
type IUnsub interface {
Unsub(id, topic string) bool
}
type IPub interface {
Pub(topic string, max int, args ...interface{}) int
}
// 订阅中心,
type Subscribe struct {
lk sync.RWMutex
topiclst map[string]*SubTopicItem
routeParseFunc func(topic string) interface{} // 必须确保有值
routeMatchFunc func(route interface{}, topic interface{}) bool // 必须确保有值
}
func innerParseRoute(route string) interface{} {
return strings.Split(route, "/")
}
func innerRouteIncludesTopic0(route []string, topic []string) bool {
if len(route) == 0 {
return len(topic) == 0
}
if len(topic) == 0 {
return route[0] == "#"
}
if route[0] == "#" {
return true
}
if (route[0] == "+") || (route[0] == topic[0]) {
return innerRouteIncludesTopic0(route[1:], topic[1:])
}
return false
}
/*
1000W:consume:730(ms)
*/
func innerRouteIncludesTopic(route interface{}, topic interface{}) bool {
return innerRouteIncludesTopic0(route.([]string), topic.([]string))
}
func NewSubscribe() *Subscribe {
return &Subscribe{
topiclst: make(map[string]*SubTopicItem),
routeParseFunc: innerParseRoute,
routeMatchFunc: innerRouteIncludesTopic,
}
}
func NewSubscribeEx(routeParseFunc func(topic string) interface{}, routeMatchFunc func(route interface{}, topic interface{}) bool) *Subscribe {
if routeParseFunc == nil {
panic("invalid routeParseFunc")
}
if routeMatchFunc == nil {
panic("invalid routeMatch")
}
return &Subscribe{
topiclst: make(map[string]*SubTopicItem),
routeParseFunc: routeParseFunc,
routeMatchFunc: routeMatchFunc,
}
}
type SubTopicItem struct {
subtopicid string
subtopic interface{}
sesslst map[string]SubFunc
}
func (this *Subscribe) Close() error {
this.lk.Lock()
defer this.lk.Unlock()
for k, _ := range this.topiclst {
delete(this.topiclst, k)
}
return nil
}
func (this *Subscribe) matchTopic(topics interface{}, fn func(itm *SubTopicItem) bool) {
this.rangeTopics(func(key string, itm *SubTopicItem) bool {
if this.routeMatchFunc(itm.subtopic, topics) {
return fn(itm)
}
return true
})
}
func (this *Subscribe) checkGetTopic(topic string, new bool) *SubTopicItem {
itm := this.topiclst[topic]
if new && itm == nil {
itm = &SubTopicItem{subtopicid: topic, subtopic: this.routeParseFunc(topic), sesslst: make(map[string]SubFunc)}
this.topiclst[topic] = itm
}
return itm
}
func (this *Subscribe) Status() string {
return fmt.Sprintf("topic-n:%d, sess:%d", this.TopicCount(), this.Count())
}
func (this *Subscribe) GetTopicSubCount(topic string) int {
this.lk.RLock()
defer this.lk.RUnlock()
itm := this.checkGetTopic(topic, false)
if itm == nil {
return 0
}
return len(itm.sesslst)
}
func (this *Subscribe) TopicCount() int {
this.lk.RLock()
defer this.lk.RUnlock()
return len(this.topiclst)
}
func (this *Subscribe) rangeTopics(fn func(key string, itm *SubTopicItem) bool) {
for k, v := range this.topiclst {
if !fn(k, v) {
break
}
}
}
func (this *Subscribe) Count() int {
this.lk.RLock()
defer this.lk.RUnlock()
n := 0
this.rangeTopics(func(key string, itm *SubTopicItem) bool {
n += len(itm.sesslst)
return true
})
return n
}
func (this *Subscribe) StatusDetail() string {
var sb gobase.BytesBuilder
sb.Appendf("topic:%d", this.TopicCount())
lst := make([]string, 0, 1024)
this.rangeTopics(func(key string, itm *SubTopicItem) bool {
lst = append(lst, key)
return true
})
sort.Strings(lst)
for i := 0; i < len(lst); i++ {
itm := this.checkGetTopic(lst[i], false)
if itm != nil {
sb.Appendf(",%s:%d", lst[i], len(itm.sesslst))
}
}
return sb.String()
}
// 订阅一个主题
//
// topic订阅主题,为空不进行订阅
// id订阅者id,topic 下id重复将会被覆盖(之前订阅失效)
func (this *Subscribe) Sub(id, topic string, cb func(id, topic string, args ...interface{}) (ok bool)) {
if len(topic) == 0 {
return
}
this.lk.Lock()
defer this.lk.Unlock()
itm := this.checkGetTopic(topic, true)
itm.sesslst[id] = cb
return
}
// 取消订阅
// id 订阅时传入的id
// topic订阅的主题
func (this *Subscribe) Unsub(id, topic string) bool {
if len(topic) == 0 {
return false
}
this.lk.Lock()
defer this.lk.Unlock()
itm := this.checkGetTopic(topic, false)
if itm != nil {
delete(itm.sesslst, id)
if len(itm.sesslst) == 0 {
delete(this.topiclst, topic)
}
return true
}
return false
}
/*
max:0, 全部投递
>1 投递成功max次后停止
*/
//func (this *Subscribe) Pub0(topic string, max int, args ...interface{}) int {
// n := 0
// chkmap := make(map[interface{}]interface{})
// topics := ParseRoute(topic)
// this.lk.RLock()
// this.matchTopic(topics, func(itm *SubTopicItem) bool {
// itm.sesslst.Range(func(key, value interface{}) bool {
// if chkmap[key] == nil {
// chkmap[key] = value
// }
// return true
// })
// return true
// })
// this.lk.RUnlock()
//
// for _, value := range chkmap {
// if fn, ok := value.(func(args ...interface{}) bool); ok {
// if fn(args...) {
// n++
// }
// }
// }
// return n
//}
/*
max:0, 全部投递
>1 投递成功max次后停止
循环map
utils_subscribe_v2._test.go:168: 34228, 6844/s
utils_subscribe_v2._test.go:168: 68952, 6894/s
循环lst
utils_subscribe_v2._test.go:168: 35343, 6977/s
utils_subscribe_v2._test.go:168: 71194, 7072/s
*/
//向主题订阅者推送数据
//topic推送的主题
//max最大接收者,超过该值不再进行推送
func (this *Subscribe) Pub(topic string, max int, args ...interface{}) int {
n := 0
chkmap := make(map[interface{}]int8)
fnlst := make([]SubFunc, 0, 1024)
idlst := make([]string, 0, 1024)
topics := this.routeParseFunc(topic)
this.lk.RLock()
this.matchTopic(topics, func(itm *SubTopicItem) bool {
for key, value := range itm.sesslst {
if chkmap[key] == 0 {
fnlst = append(fnlst, value)
idlst = append(idlst, key)
chkmap[key] = 1
}
}
return true
})
this.lk.RUnlock()
for idx, fn := range fnlst {
if fn(idlst[idx], topic, args...) {
n++
if max > 0 && n >= max {
break
}
}
}
return n
}
1
https://gitee.com/ymofen/gobase.git
git@gitee.com:ymofen/gobase.git
ymofen
gobase
gobase
v1.2.24041

搜索帮助

53164aa7 5694891 3bd8fe86 5694891