2 Star 0 Fork 0

carlmax_my/go-mtls

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
pubsub.go 2.60 KB
一键复制 编辑 原始数据 按行查看 历史
carlmax_my 提交于 2025-11-28 17:32 +08:00 . fix mtls server client ctx issue
package server
import (
"context"
"encoding/json"
"time"
"gitee.com/carlmax_my/go-mtls/pkg/constant"
"gitee.com/carlmax_my/go-mtls/pkg/core"
"github.com/pkg/errors"
)
func (s *Server) PingRedis() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() // 确保资源释放
return s.redis.Ping(ctx).Result()
}
func isBroadcastChannel(ch string, prefix string) bool {
return ch == prefix+constant.BroadcastChannelName
}
func (s *Server) handlePatternSubscriptions() error {
// 标记模式已订阅
s.mu.Lock()
s.patternSubscriptions[s.options.ChannelPrefix] = true
s.mu.Unlock()
// ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel() // 确保资源释放
ctx := context.Background()
// 发布到Redis
pubsub := s.redis.PSubscribe(ctx, s.options.ChannelPrefix)
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
// m.channelManager.Publish(msg.Channel, []byte(msg.Payload))
s.logger.Debugf("recv pubsub message[%s]: %s", msg.Channel, msg.Payload)
// 当消息到达时,遍历所有客户端,只发送给订阅了该具体频道的客户端
for client := range s.clients {
// client.MU.RLock()
isBroadcast := isBroadcastChannel(msg.Channel, s.options.ChannelPrefix)
if client.authed && (isBroadcast || client.channels[msg.Channel]) {
select {
case client.send <- []byte(msg.Payload):
default:
// client.MU.RLock()
// 客户端发送队列已满,断开连接
s.logger.Warnf("client queue full: %s", client.clientId)
close(client.send)
delete(s.clients, client)
// client.MU.RUnlock()
}
}
// client.MU.RUnlock()
}
}
return nil
}
// you can also use directly use redis client to publish, without this mtls conn
// publish to redis, manager will recv and send to client
// this func can move outside of manager
func (s *Server) PublishChannelMsg(channel string, subCmd string, data string) error {
// 构建要发布的消息
pubMsg := &core.BaseCmdMsg{
Cmd: core.CMD_PUBLISH, //subCmd,
Data: data,
SubCmd: subCmd,
Channel: channel, // 带channel就是发布的消息
}
pubData, err := json.Marshal(pubMsg)
if err != nil {
return errors.Wrap(err, "Marshal err")
}
// 发布到Redis
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() // 确保资源释放
err = s.redis.Publish(ctx, channel, pubData).Err()
if err != nil {
return errors.Wrap(err, "publish to Redis error")
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/carlmax_my/go-mtls.git
git@gitee.com:carlmax_my/go-mtls.git
carlmax_my
go-mtls
go-mtls
v0.0.12

搜索帮助