1 Star 0 Fork 0

h79/goim

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mq.go 4.22 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2024-08-29 01:08 . 升级库
package im
import (
"context"
"encoding/json"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/option"
"gitee.com/h79/goutils/common/random"
"gitee.com/h79/goutils/common/scheduler"
"gitee.com/h79/goutils/common/stringutil"
"gitee.com/h79/goutils/common/timer"
"strings"
"sync"
)
const (
SubscribeMQ = iota
UnSubscribeMQ
SendMQ
)
type MessageMQ struct {
Cmd int `json:"cmd"`
Topic string `json:"topic"` // SubscribeMQ OR UnSubscribeMQ 可以多个topic, 以逗号分隔
Content string `json:"content"` // 具体内容
traceId string
from Session
}
type subscriber struct {
hub *Hub
rm sync.RWMutex
client map[*Client]struct{}
}
func (s *subscriber) add(c *Client) {
s.rm.Lock()
s.client[c] = struct{}{}
s.rm.Unlock()
logger.N("MQ", "subscriber client= %s", c.String())
}
func (s *subscriber) remove(c *Client) {
s.rm.Lock()
delete(s.client, c)
s.rm.Unlock()
logger.N("MQ", "remove subscriber client= %s", c.String())
}
func (s *subscriber) empty() bool {
s.rm.RLock()
var ret = len(s.client) == 0
s.rm.RUnlock()
return ret
}
func (s *subscriber) send(mq *Mq, msg MessageMQ) {
if s.empty() {
logger.W("IM", "MQ subscriber is empty, topic= '%s', content= '%s'", msg.Topic, msg.Content)
return
}
var job = scheduler.NewJob("mq", stringutil.Int64ToString(timer.CurrentMS())+random.GenerateLowerString(6), 0)
job.WithPayload(&msg)
job.WithHandlerFunc(s.handler)
mq.pool.AddJob(job)
}
func (s *subscriber) handler(job *scheduler.Job, opts ...option.Option) (interface{}, error) {
var msg = job.GetPayload().(*MessageMQ)
var ev = CreateObjEvent(msg.traceId, ETMq, msg)
var bytes = RequestEvent(s.hub.source, ev, &msg.from).ToBytes()
s.rm.RLock()
defer s.rm.RUnlock()
for c, _ := range s.client {
err := c.SendTo(bytes)
if err != nil {
logger.E("IM", "MQ send failure, topic= '%s', content= '%s'", msg.Topic, msg.Content)
}
}
return nil, nil
}
func newSubscriber(hub *Hub) *subscriber {
return &subscriber{
hub: hub,
client: make(map[*Client]struct{}),
}
}
type Mq struct {
hub *Hub
pool *scheduler.Pool
rm sync.RWMutex
topics map[string]*subscriber
}
func newMQ(hub *Hub) *Mq {
m := &Mq{
hub: hub,
pool: scheduler.NewPool(5, 100),
topics: make(map[string]*subscriber),
}
m.pool.Run()
Register(ETMq.Int(), m.Process)
return m
}
func (mq *Mq) removeClient(c *Client) {
var topic []string
mq.rm.Lock()
for top, sub := range mq.topics {
sub.remove(c)
if sub.empty() {
topic = append(topic, top)
}
}
// remove topic of empty subscriber
for i := range topic {
delete(mq.topics, topic[i])
}
mq.rm.Unlock()
}
func (mq *Mq) Receive(ctx context.Context, from *Session, ev *Event) {
var msg = MessageMQ{traceId: ev.TraceId, from: *from}
var err = json.Unmarshal([]byte(ev.Content), &msg)
if err != nil {
return
}
if msg.Cmd != SendMQ {
return
}
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, msg)
}
}
func (mq *Mq) Process(c *Client, req *EventReq) (interface{}, error) {
req.EvTypeRsp = ETMqRsp.Int()
var msg = MessageMQ{traceId: req.TraceId, from: c.user}
var err = json.Unmarshal([]byte(req.Content), &msg)
if err != nil {
logger.E("MQ", "process failure, err= %v", err)
return nil, err
}
logger.N("MQ", "process data= %v", msg)
switch msg.Cmd {
case SubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
mq.rm.Lock()
for i := range topics {
sub, ok := mq.topics[topics[i]]
if !ok {
sub = newSubscriber(mq.hub)
mq.topics[topics[i]] = sub
}
sub.add(c)
}
mq.rm.Unlock()
case UnSubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
var topic []string
mq.rm.Lock()
for i := range topics {
sub, ok := mq.topics[topics[i]]
if ok {
sub.remove(c)
if sub.empty() {
topic = append(topic, topics[i])
}
}
}
// remove topic of empty subscriber
for i := range topic {
delete(mq.topics, topic[i])
}
mq.rm.Unlock()
case SendMQ:
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, msg)
}
// to broadcast other server
var ctx = NewWithTraceId(req.TraceId)
mq.hub.SendToRemote(ctx, &c.user, &Session{AppId: c.user.AppId, Plat: c.user.Plat}, &req.Event)
default:
}
return nil, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goim.git
git@gitee.com:h79/goim.git
h79
goim
goim
v0.3.10

搜索帮助

0d507c66 1850385 C8b1a773 1850385