代码拉取完成,页面将自动刷新
package im
import (
"context"
"encoding/json"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/option"
"gitee.com/h79/goutils/common/random"
"gitee.com/h79/goutils/common/scheduler"
"gitee.com/h79/goutils/common/stringutil"
"gitee.com/h79/goutils/common/timer"
"strings"
"sync"
)
const (
SubscribeMQ = iota
UnSubscribeMQ
SendMQ
)
type MessageMQ struct {
Cmd int `json:"cmd"`
Topic string `json:"topic"` // SubscribeMQ OR UnSubscribeMQ 可以多个topic, 以逗号分隔
Content string `json:"content"` // 具体内容
traceId string
from Session
}
type subscriber struct {
hub *Hub
rm sync.RWMutex
client map[*Client]struct{}
}
func (s *subscriber) add(c *Client) {
s.rm.Lock()
s.client[c] = struct{}{}
s.rm.Unlock()
logger.N("MQ", "subscriber client= %s", c.String())
}
func (s *subscriber) remove(c *Client) {
s.rm.Lock()
delete(s.client, c)
s.rm.Unlock()
logger.N("MQ", "remove subscriber client= %s", c.String())
}
func (s *subscriber) empty() bool {
s.rm.RLock()
var ret = len(s.client) == 0
s.rm.RUnlock()
return ret
}
func (s *subscriber) send(mq *Mq, msg MessageMQ) {
if s.empty() {
logger.W("IM", "MQ subscriber is empty, topic= '%s', content= '%s'", msg.Topic, msg.Content)
return
}
var job = scheduler.NewJob("mq", stringutil.Int64ToString(timer.CurrentMS())+random.GenerateLowerString(6), 0)
job.WithPayload(&msg)
job.WithHandlerFunc(s.handler)
mq.pool.AddJob(job)
}
func (s *subscriber) handler(job *scheduler.Job, opts ...option.Option) (interface{}, error) {
var msg = job.GetPayload().(*MessageMQ)
var ev = CreateObjEvent(msg.traceId, ETMq, msg)
var bytes = RequestEvent(s.hub.source, ev, &msg.from).ToBytes()
s.rm.RLock()
defer s.rm.RUnlock()
for c, _ := range s.client {
err := c.SendTo(bytes)
if err != nil {
logger.E("IM", "MQ send failure, topic= '%s', content= '%s'", msg.Topic, msg.Content)
}
}
return nil, nil
}
func newSubscriber(hub *Hub) *subscriber {
return &subscriber{
hub: hub,
client: make(map[*Client]struct{}),
}
}
type Mq struct {
hub *Hub
pool *scheduler.Pool
rm sync.RWMutex
topics map[string]*subscriber
}
func newMQ(hub *Hub) *Mq {
m := &Mq{
hub: hub,
pool: scheduler.NewPool(5, 100),
topics: make(map[string]*subscriber),
}
m.pool.Run()
Register(ETMq.Int(), m.Process)
return m
}
func (mq *Mq) removeClient(c *Client) {
var topic []string
mq.rm.Lock()
for top, sub := range mq.topics {
sub.remove(c)
if sub.empty() {
topic = append(topic, top)
}
}
// remove topic of empty subscriber
for i := range topic {
delete(mq.topics, topic[i])
}
mq.rm.Unlock()
}
func (mq *Mq) Receive(ctx context.Context, from *Session, ev *Event) {
var msg = MessageMQ{traceId: ev.TraceId, from: *from}
var err = json.Unmarshal([]byte(ev.Content), &msg)
if err != nil {
return
}
if msg.Cmd != SendMQ {
return
}
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, msg)
}
}
func (mq *Mq) Process(c *Client, req *EventReq) (interface{}, error) {
req.EvTypeRsp = ETMqRsp.Int()
var msg = MessageMQ{traceId: req.TraceId, from: c.user}
var err = json.Unmarshal([]byte(req.Content), &msg)
if err != nil {
logger.E("MQ", "process failure, err= %v", err)
return nil, err
}
logger.N("MQ", "process data= %v", msg)
switch msg.Cmd {
case SubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
mq.rm.Lock()
for i := range topics {
sub, ok := mq.topics[topics[i]]
if !ok {
sub = newSubscriber(mq.hub)
mq.topics[topics[i]] = sub
}
sub.add(c)
}
mq.rm.Unlock()
case UnSubscribeMQ:
var topics = strings.Split(msg.Topic, ",")
var topic []string
mq.rm.Lock()
for i := range topics {
sub, ok := mq.topics[topics[i]]
if ok {
sub.remove(c)
if sub.empty() {
topic = append(topic, topics[i])
}
}
}
// remove topic of empty subscriber
for i := range topic {
delete(mq.topics, topic[i])
}
mq.rm.Unlock()
case SendMQ:
mq.rm.RLock()
sub, ok := mq.topics[msg.Topic]
mq.rm.RUnlock()
if ok {
sub.send(mq, msg)
}
// to broadcast other server
var ctx = NewWithTraceId(req.TraceId)
mq.hub.SendToRemote(ctx, &c.user, &Session{AppId: c.user.AppId, Plat: c.user.Plat}, &req.Event)
default:
}
return nil, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。