代码拉取完成,页面将自动刷新
同步操作将从 menuiis/gkit 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package broker
import (
"context"
"sync"
"gitee.com/menciis/gkit/distributed/retry"
"gitee.com/menciis/gkit/options"
)
// registeredTask 注册的任务
type registeredTask struct {
sync.RWMutex
item map[string]bool
}
// Register 注册
func (r *registeredTask) Register(taskName string) {
r.Lock()
defer r.Unlock()
r.item[taskName] = true
}
// RegisterList 注册
func (r *registeredTask) RegisterList(taskNameList ...string) {
r.Lock()
defer r.Unlock()
for _, taskName := range taskNameList {
r.item[taskName] = true
}
}
// Quit 注销
func (r *registeredTask) Quit(taskName string) {
r.Lock()
defer r.Unlock()
delete(r.item, taskName)
}
// IsRegister 是否注册
func (r *registeredTask) IsRegister(taskName string) bool {
r.RLock()
defer r.RUnlock()
return r.item[taskName]
}
// NewRegisteredTask 初始化任务注册器
func NewRegisteredTask() *registeredTask {
return ®isteredTask{
item: make(map[string]bool),
}
}
// Broker Broker
type Broker struct {
// registeredTask 注册器
*registeredTask
// retry 是否重试
retry bool
// retryFn 重试函数
retryFn func(ctx context.Context)
retryCtx context.Context
retryCancel context.CancelFunc
stopCtx context.Context
stopCancel context.CancelFunc
}
// NewBroker 初始化 Broker
func NewBroker(r *registeredTask, ctx context.Context, options ...options.Option) *Broker {
b := &Broker{
registeredTask: r,
}
for _, option := range options {
option(b)
}
if b.retry == true && b.retryFn == nil {
b.retryFn = retry.Retry()
}
b.retryCtx, b.retryCancel = context.WithCancel(ctx)
b.stopCtx, b.stopCancel = context.WithCancel(ctx)
return b
}
func (b *Broker) GetRetry() bool {
return b.retry
}
func (b *Broker) GetRetryFn() func(ctx context.Context) {
return b.retryFn
}
func (b *Broker) GetRetryCtx() context.Context {
return b.retryCtx
}
func (b *Broker) GetStopCtx() context.Context {
return b.stopCtx
}
func (b *Broker) StopConsuming() {
b.retry = false
b.retryCancel()
b.stopCancel()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。