1 Star 0 Fork 0

lipore / plume

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
pubsub.go 1.70 KB
一键复制 编辑 原始数据 按行查看 历史
lipore 提交于 2024-04-07 10:40 . fix: build issues
package redis
import (
"context"
"gitee.com/lipore/plume/errors"
"github.com/redis/go-redis/v9"
"time"
)
type PubSubConfig struct {
Config
Channel []string
CheckALiveInterval time.Duration
}
func (p *PubSubConfig) Apply(config *PubSubConfig) {
config.Channel = p.Channel
config.CheckALiveInterval = p.CheckALiveInterval
}
type PubSubOptions interface {
Apply(config *PubSubConfig)
}
type withChannel struct {
channel string
}
func (w *withChannel) Apply(config *PubSubConfig) {
config.Channel = append(config.Channel, w.channel)
}
func WithChannel(channel string) PubSubOptions {
return &withChannel{channel: channel}
}
type withCheckALiveInterval struct {
checkALiveInterval time.Duration
}
func (w *withCheckALiveInterval) Apply(config *PubSubConfig) {
config.CheckALiveInterval = w.checkALiveInterval
}
func WithCheckALiveInterval(channel time.Duration) PubSubOptions {
return &withCheckALiveInterval{checkALiveInterval: channel}
}
type Message struct {
Channel string
Payload string
}
func Subscribe(ctx context.Context, client Client, messageHandle func(message *Message), opts ...PubSubOptions) error {
config := &PubSubConfig{}
for _, opt := range opts {
opt.Apply(config)
}
if config.Channel == nil || len(config.Channel) == 0 {
return errors.WithCode(nil, 0, "channel list is empty")
}
sub := client.Subscribe(ctx, config.Channel...)
go func() {
var channelOpts []redis.ChannelOption
if config.CheckALiveInterval != 0 {
channelOpts = append(channelOpts, redis.WithChannelHealthCheckInterval(config.CheckALiveInterval))
}
for msg := range sub.Channel(channelOpts...) {
messageHandle(&Message{Channel: msg.Channel, Payload: msg.Payload})
}
}()
return nil
}
1
https://gitee.com/lipore/plume.git
git@gitee.com:lipore/plume.git
lipore
plume
plume
v1.7.6

搜索帮助