代码拉取完成,页面将自动刷新
package websocket
import (
"context"
"fmt"
"gitee.com/unitedrhino/share/ctxs"
"gitee.com/unitedrhino/share/eventBus"
"gitee.com/unitedrhino/share/utils"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/kv"
"sync"
"time"
)
const (
asyncExecMax = 500
)
type publishStu struct {
*WsPublish
ctx context.Context
}
type UserSubscribe struct {
publishChan chan publishStu //key是apisvr的节点id
mutex sync.RWMutex
ServerMsg *eventBus.FastEvent
}
func NewUserSubscribe(store kv.Store, ServerMsg *eventBus.FastEvent) *UserSubscribe {
u := UserSubscribe{publishChan: make(chan publishStu, asyncExecMax), ServerMsg: ServerMsg}
utils.Go(context.Background(), func() {
u.publish()
})
return &u
}
func (u *UserSubscribe) Publish(ctx context.Context, code string, data any, params ...map[string]any) error {
pb := WsPublish{
Code: code,
Data: data,
}
for _, param := range params {
pb.Params = append(pb.Params, utils.Md5Map(param))
}
u.publishChan <- publishStu{
WsPublish: &pb,
ctx: ctxs.CopyCtx(ctx),
}
logx.WithContext(ctx).Debugf("websocket UserSubscribe.publish pb:%v params:%v", utils.Fmt(pb), utils.Fmt(params))
return nil
}
func (u *UserSubscribe) publish() {
execCache := make([]publishStu, 0, asyncExecMax)
exec := func() {
if len(execCache) == 0 {
return
}
logx.WithContext(execCache[0].ctx).Debugf("websocket UserSubscribe.publish publishs:%v", utils.Fmt(execCache))
err := u.ServerMsg.Publish(execCache[0].ctx, fmt.Sprintf(eventBus.CoreApiUserPublish, 1), execCache)
if err != nil {
logx.WithContext(execCache[0].ctx).Error(err)
}
execCache = execCache[0:0] //清空切片
}
tick := time.Tick(time.Second)
for {
select {
case _ = <-tick:
exec()
case e := <-u.publishChan:
execCache = append(execCache, e)
if len(execCache) > asyncExecMax {
exec()
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。