代码拉取完成,页面将自动刷新
package redis
import (
"context"
"gitee.com/lipore/plume/errors"
"github.com/redis/go-redis/v9"
"time"
)
type PubSubConfig struct {
Config
Channel []string
CheckALiveInterval time.Duration
}
func (p *PubSubConfig) Apply(config *PubSubConfig) {
config.Channel = p.Channel
config.CheckALiveInterval = p.CheckALiveInterval
}
type PubSubOptions interface {
Apply(config *PubSubConfig)
}
type withChannel struct {
channel string
}
func (w *withChannel) Apply(config *PubSubConfig) {
config.Channel = append(config.Channel, w.channel)
}
func WithChannel(channel string) PubSubOptions {
return &withChannel{channel: channel}
}
type withCheckALiveInterval struct {
checkALiveInterval time.Duration
}
func (w *withCheckALiveInterval) Apply(config *PubSubConfig) {
config.CheckALiveInterval = w.checkALiveInterval
}
func WithCheckALiveInterval(channel time.Duration) PubSubOptions {
return &withCheckALiveInterval{checkALiveInterval: channel}
}
type Message struct {
Channel string
Payload string
}
func Subscribe(ctx context.Context, client Client, messageHandle func(message *Message), opts ...PubSubOptions) error {
config := &PubSubConfig{}
for _, opt := range opts {
opt.Apply(config)
}
if config.Channel == nil || len(config.Channel) == 0 {
return errors.WithCode(nil, 0, "channel list is empty")
}
sub := client.Subscribe(ctx, config.Channel...)
go func() {
var channelOpts []redis.ChannelOption
if config.CheckALiveInterval != 0 {
channelOpts = append(channelOpts, redis.WithChannelHealthCheckInterval(config.CheckALiveInterval))
}
for msg := range sub.Channel(channelOpts...) {
messageHandle(&Message{Channel: msg.Channel, Payload: msg.Payload})
}
}()
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。