Ai
1 Star 0 Fork 0

k8sio/cnkit

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
monitor.go 1.59 KB
一键复制 编辑 原始数据 按行查看 历史
YuanHack 提交于 2024-08-05 23:21 +08:00 . new commit
package delayqueue
// Monitor can get running status and events of DelayQueue
type Monitor struct {
inner *DelayQueue
}
// NewMonitor0 creates a new Monitor by a RedisCli instance
func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor {
return &Monitor{
inner: NewQueue0(name, cli, opts...),
}
}
// WithLogger customizes logger for queue
func (m *Monitor) WithLogger(logger Logger) *Monitor {
m.inner.logger = logger
return m
}
// GetPendingCount returns the number of messages which delivery time has not arrived
func (m *Monitor) GetPendingCount() (int64, error) {
return m.inner.GetPendingCount()
}
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet
func (m *Monitor) GetReadyCount() (int64, error) {
return m.inner.GetReadyCount()
}
// GetProcessingCount returns the number of messages which are being processed
func (m *Monitor) GetProcessingCount() (int64, error) {
return m.inner.GetProcessingCount()
}
// ListenEvent register a listener which will be called when events occured in this queue
// so it can be used to monitor running status
// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error) {
reportChan := genReportChannel(m.inner.name)
sub, closer, err := m.inner.redisCli.Subscribe(reportChan)
if err != nil {
return nil, err
}
go func() {
for payload := range sub {
event, err := decodeEvent(payload)
if err != nil {
m.inner.logger.Errorf("[listen event] %v\n", event)
} else {
listener.OnEvent(event)
}
}
}()
return closer, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/k8sio/cnkit.git
git@gitee.com:k8sio/cnkit.git
k8sio
cnkit
cnkit
master

搜索帮助