代码拉取完成,页面将自动刷新
package server
import (
"context"
"encoding/json"
"time"
"gitee.com/carlmax_my/go-mtls/pkg/constant"
"gitee.com/carlmax_my/go-mtls/pkg/core"
"github.com/pkg/errors"
)
func (s *Server) PingRedis() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() // 确保资源释放
return s.redis.Ping(ctx).Result()
}
func isBroadcastChannel(ch string, prefix string) bool {
return ch == prefix+constant.BroadcastChannelName
}
func (s *Server) handlePatternSubscriptions() error {
// 标记模式已订阅
s.mu.Lock()
s.patternSubscriptions[s.options.ChannelPrefix] = true
s.mu.Unlock()
// ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel() // 确保资源释放
ctx := context.Background()
// 发布到Redis
pubsub := s.redis.PSubscribe(ctx, s.options.ChannelPrefix)
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
// m.channelManager.Publish(msg.Channel, []byte(msg.Payload))
s.logger.Debugf("recv pubsub message[%s]: %s", msg.Channel, msg.Payload)
// 当消息到达时,遍历所有客户端,只发送给订阅了该具体频道的客户端
for client := range s.clients {
// client.MU.RLock()
isBroadcast := isBroadcastChannel(msg.Channel, s.options.ChannelPrefix)
if client.authed && (isBroadcast || client.channels[msg.Channel]) {
select {
case client.send <- []byte(msg.Payload):
default:
// client.MU.RLock()
// 客户端发送队列已满,断开连接
s.logger.Warnf("client queue full: %s", client.clientId)
close(client.send)
delete(s.clients, client)
// client.MU.RUnlock()
}
}
// client.MU.RUnlock()
}
}
return nil
}
// you can also use directly use redis client to publish, without this mtls conn
// publish to redis, manager will recv and send to client
// this func can move outside of manager
func (s *Server) PublishChannelMsg(channel string, subCmd string, data string) error {
// 构建要发布的消息
pubMsg := &core.BaseCmdMsg{
Cmd: core.CMD_PUBLISH, //subCmd,
Data: data,
SubCmd: subCmd,
Channel: channel, // 带channel就是发布的消息
}
pubData, err := json.Marshal(pubMsg)
if err != nil {
return errors.Wrap(err, "Marshal err")
}
// 发布到Redis
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() // 确保资源释放
err = s.redis.Publish(ctx, channel, pubData).Err()
if err != nil {
return errors.Wrap(err, "publish to Redis error")
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。