1 Star 0 Fork 0

h79/goim

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mq.go 5.10 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2026-03-02 20:41 +08:00 . 0.5.16 & MQ消息指定点发送
package im
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"gitee.com/h79/goim/session"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/option"
"gitee.com/h79/goutils/common/random"
"gitee.com/h79/goutils/common/scheduler"
"gitee.com/h79/goutils/common/stringutil"
"gitee.com/h79/goutils/common/timer"
)
const (
SubscribeMQ = iota
UnSubscribeMQ
SendMQ
)
type MessageMQ struct {
Cmd int `json:"cmd"`
Flag int32 `json:"flag"` // 是否排除自己
Topic string `json:"topic"` // SubscribeMQ OR UnSubscribeMQ 可以多个topic, 以逗号分隔
Content string `json:"content"` // 具体内容
traceId string
to *session.Session
from *session.Session
}
func (m *MessageMQ) GetToSession() *session.Session {
return m.to
}
func (m *MessageMQ) GetFromSession() *session.Session {
return m.from
}
func (m *MessageMQ) GetTraceId() string {
return m.traceId
}
type subscriber struct {
hub *Hub
rm sync.RWMutex
client map[*Connect]struct{}
}
func (s *subscriber) add(c *Connect) {
s.rm.Lock()
s.client[c] = struct{}{}
s.rm.Unlock()
logger.N("MQ", "subscriber client= %s", c.String())
}
func (s *subscriber) remove(c *Connect) int {
s.rm.Lock()
defer s.rm.Unlock()
delete(s.client, c)
return len(s.client)
}
func (s *subscriber) empty() bool {
s.rm.RLock()
var ret = len(s.client) == 0
s.rm.RUnlock()
return ret
}
func (s *subscriber) send(mq *Mq, msg *MessageMQ) {
if s.empty() {
logger.W("IM", "MQ subscriber is empty, topic= '%s', content= '%s'", msg.Topic, msg.Content)
return
}
var taskId = stringutil.Int64ToString(timer.CurrentMS()) + random.GenerateLowerString(6)
var job = scheduler.NewJob("mq", taskId, 0)
job.WithPayload(msg)
job.WithHandlerFunc(s.handler)
mq.pool.AddJob(job)
}
func (s *subscriber) handler(job scheduler.Task, opts ...option.Option) (interface{}, error) {
var err error
var msg = job.GetPayload().(*MessageMQ)
var ev = CreateObjEvent(msg.traceId, ETMq, msg)
var bytes = RequestEvent(s.hub.source, ev, msg.from).SetToSession(msg.to).ToBytes()
var ctx = context.Background()
var toClient = s.hub.GetClient(msg.to)
s.rm.RLock()
defer s.rm.RUnlock()
for c := range s.client {
if c == nil {
continue
}
if toClient != nil && !toClient.ConnectEqual(c) {
continue
}
if s.hub.msgInterceptor != nil {
err = s.hub.msgInterceptor.MsgInterceptor(ctx, msg.from, c, MQMsg, msg)
if err != nil {
continue
}
}
err = c.SendTo(bytes)
if err != nil {
logger.E("IM", "MQ send failure, topic= '%s', content= '%s'", msg.Topic, msg.Content)
}
if toClient != nil {
// 给指定的客户端
break
}
}
return nil, nil
}
func newSubscriber(hub *Hub) *subscriber {
return &subscriber{
hub: hub,
client: make(map[*Connect]struct{}),
}
}
type Mq struct {
hub *Hub
pool *scheduler.Pool
rm sync.RWMutex
topics map[string]*subscriber
}
func newMQ(hub *Hub) *Mq {
m := &Mq{
hub: hub,
pool: scheduler.NewPool(5, 100),
topics: make(map[string]*subscriber),
}
m.pool.Run()
hub.process.RegisterDispose(ETMq.Int(), m.Process)
return m
}
func (mq *Mq) remove(c *Connect) {
mq.rm.Lock()
defer mq.rm.Unlock()
for top, sub := range mq.topics {
if sub.remove(c) == 0 {
delete(mq.topics, top)
}
}
}
func (mq *Mq) Receive(ctx context.Context, from *session.Session, to *session.Session, ev *Event) {
var msg = MessageMQ{traceId: ev.TraceId, from: from, to: to}
var err = json.Unmarshal(StringToBytes(ev.Content), &msg)
if err != nil {
return
}
if msg.Cmd != SendMQ {
return
}
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, &msg)
}
}
func (mq *Mq) Process(c *Connect, req *EventReq) (interface{}, error) {
req.EvTypeRsp = ETMqRsp.Int()
var msg = MessageMQ{traceId: req.TraceId, from: req.GetFromSession(), to: req.GetToSession()}
var err = json.Unmarshal(StringToBytes(req.Content), &msg)
if err != nil {
logger.E("MQ", "process failure, err= %v", err)
return nil, err
}
switch msg.Cmd {
case SubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
mq.rm.Lock()
for i := range topics {
sub, ok := mq.topics[topics[i]]
if !ok {
sub = newSubscriber(mq.hub)
mq.topics[topics[i]] = sub
}
sub.add(c)
}
mq.rm.Unlock()
return &ResCmd{Cmd: msg.Cmd, Code: 0, Msg: fmt.Sprintf("success: subscribe topic(%s)", msg.Topic)}, nil
case UnSubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
var sub *subscriber
var ok bool
mq.rm.Lock()
for i := range topics {
sub, ok = mq.topics[topics[i]]
if ok && sub.remove(c) == 0 {
delete(mq.topics, topics[i])
}
}
mq.rm.Unlock()
return &ResCmd{Cmd: msg.Cmd, Code: 0, Msg: fmt.Sprintf("success: unsubscribe topic(%s)", msg.Topic)}, nil
case SendMQ:
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, &msg)
}
// to broadcast other server
var ctx = NewWithTraceId(req.TraceId)
var to = req.GetToSession()
mq.hub.SendToRemote(ctx, &req.Session, to, &req.Event)
return &ResCmd{Cmd: msg.Cmd, Code: 0, Msg: fmt.Sprintf("success: send topic(%s)", msg.Topic)}, nil
default:
}
return nil, MQNotSupport
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goim.git
git@gitee.com:h79/goim.git
h79
goim
goim
v0.5.16

搜索帮助