代码拉取完成,页面将自动刷新
package im
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"gitee.com/h79/goim/session"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/option"
"gitee.com/h79/goutils/common/random"
"gitee.com/h79/goutils/common/scheduler"
"gitee.com/h79/goutils/common/stringutil"
"gitee.com/h79/goutils/common/timer"
)
const (
SubscribeMQ = iota
UnSubscribeMQ
SendMQ
)
type MessageMQ struct {
Cmd int `json:"cmd"`
Flag int32 `json:"flag"` // 是否排除自己
Topic string `json:"topic"` // SubscribeMQ OR UnSubscribeMQ 可以多个topic, 以逗号分隔
Content string `json:"content"` // 具体内容
traceId string
to *session.Session
from *session.Session
}
func (m *MessageMQ) GetToSession() *session.Session {
return m.to
}
func (m *MessageMQ) GetFromSession() *session.Session {
return m.from
}
func (m *MessageMQ) GetTraceId() string {
return m.traceId
}
type subscriber struct {
hub *Hub
rm sync.RWMutex
client map[*Connect]struct{}
}
func (s *subscriber) add(c *Connect) {
s.rm.Lock()
s.client[c] = struct{}{}
s.rm.Unlock()
logger.N("MQ", "subscriber client= %s", c.String())
}
func (s *subscriber) remove(c *Connect) int {
s.rm.Lock()
defer s.rm.Unlock()
delete(s.client, c)
return len(s.client)
}
func (s *subscriber) empty() bool {
s.rm.RLock()
var ret = len(s.client) == 0
s.rm.RUnlock()
return ret
}
func (s *subscriber) send(mq *Mq, msg *MessageMQ) {
if s.empty() {
logger.W("IM", "MQ subscriber is empty, topic= '%s', content= '%s'", msg.Topic, msg.Content)
return
}
var taskId = stringutil.Int64ToString(timer.CurrentMS()) + random.GenerateLowerString(6)
var job = scheduler.NewJob("mq", taskId, 0)
job.WithPayload(msg)
job.WithHandlerFunc(s.handler)
mq.pool.AddJob(job)
}
func (s *subscriber) handler(job scheduler.Task, opts ...option.Option) (interface{}, error) {
var err error
var msg = job.GetPayload().(*MessageMQ)
var ev = CreateObjEvent(msg.traceId, ETMq, msg)
var bytes = RequestEvent(s.hub.source, ev, msg.from).SetToSession(msg.to).ToBytes()
var ctx = context.Background()
var toClient = s.hub.GetClient(msg.to)
s.rm.RLock()
defer s.rm.RUnlock()
for c := range s.client {
if c == nil {
continue
}
if toClient != nil && !toClient.ConnectEqual(c) {
continue
}
if s.hub.msgInterceptor != nil {
err = s.hub.msgInterceptor.MsgInterceptor(ctx, msg.from, c, MQMsg, msg)
if err != nil {
continue
}
}
err = c.SendTo(bytes)
if err != nil {
logger.E("IM", "MQ send failure, topic= '%s', content= '%s'", msg.Topic, msg.Content)
}
if toClient != nil {
// 给指定的客户端
break
}
}
return nil, nil
}
func newSubscriber(hub *Hub) *subscriber {
return &subscriber{
hub: hub,
client: make(map[*Connect]struct{}),
}
}
type Mq struct {
hub *Hub
pool *scheduler.Pool
rm sync.RWMutex
topics map[string]*subscriber
}
func newMQ(hub *Hub) *Mq {
m := &Mq{
hub: hub,
pool: scheduler.NewPool(5, 100),
topics: make(map[string]*subscriber),
}
m.pool.Run()
hub.process.RegisterDispose(ETMq.Int(), m.Process)
return m
}
func (mq *Mq) remove(c *Connect) {
mq.rm.Lock()
defer mq.rm.Unlock()
for top, sub := range mq.topics {
if sub.remove(c) == 0 {
delete(mq.topics, top)
}
}
}
func (mq *Mq) Receive(ctx context.Context, from *session.Session, to *session.Session, ev *Event) {
var msg = MessageMQ{traceId: ev.TraceId, from: from, to: to}
var err = json.Unmarshal(StringToBytes(ev.Content), &msg)
if err != nil {
return
}
if msg.Cmd != SendMQ {
return
}
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, &msg)
}
}
func (mq *Mq) Process(c *Connect, req *EventReq) (interface{}, error) {
req.EvTypeRsp = ETMqRsp.Int()
var msg = MessageMQ{traceId: req.TraceId, from: req.GetFromSession(), to: req.GetToSession()}
var err = json.Unmarshal(StringToBytes(req.Content), &msg)
if err != nil {
logger.E("MQ", "process failure, err= %v", err)
return nil, err
}
switch msg.Cmd {
case SubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
mq.rm.Lock()
for i := range topics {
sub, ok := mq.topics[topics[i]]
if !ok {
sub = newSubscriber(mq.hub)
mq.topics[topics[i]] = sub
}
sub.add(c)
}
mq.rm.Unlock()
return &ResCmd{Cmd: msg.Cmd, Code: 0, Msg: fmt.Sprintf("success: subscribe topic(%s)", msg.Topic)}, nil
case UnSubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
var sub *subscriber
var ok bool
mq.rm.Lock()
for i := range topics {
sub, ok = mq.topics[topics[i]]
if ok && sub.remove(c) == 0 {
delete(mq.topics, topics[i])
}
}
mq.rm.Unlock()
return &ResCmd{Cmd: msg.Cmd, Code: 0, Msg: fmt.Sprintf("success: unsubscribe topic(%s)", msg.Topic)}, nil
case SendMQ:
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, &msg)
}
// to broadcast other server
var ctx = NewWithTraceId(req.TraceId)
var to = req.GetToSession()
mq.hub.SendToRemote(ctx, &req.Session, to, &req.Event)
return &ResCmd{Cmd: msg.Cmd, Code: 0, Msg: fmt.Sprintf("success: send topic(%s)", msg.Topic)}, nil
default:
}
return nil, MQNotSupport
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。