代码拉取完成,页面将自动刷新
package comag
import (
"gitee.com/captials-team/ubdframe/src/common/utils"
"sync"
)
func NewProcessSender[T interface{}](l ...int) *ProcessSender[T] {
tmp := 128
if len(l) > 0 {
tmp = l[0]
}
return &ProcessSender[T]{
ch: make(chan T, tmp),
}
}
// ProcessSender 处理发送器
// 提供 Send 发送数据方法
// 提供 Ch 获取收取数据的channel变量
type ProcessSender[T interface{}] struct {
ch chan T
}
func (a *ProcessSender[T]) Send(items ...T) {
for _, v := range items {
var tmp = v
a.ch <- tmp
}
}
func (a *ProcessSender[T]) Ch() chan T {
return a.ch
}
func NewProcessMultiSender[T interface{}](num int) *ProcessMultiSender[T] {
return &ProcessMultiSender[T]{
chs: make([]chan T, num),
chNum: num,
}
}
// ProcessMultiSender 处理发送器(提供多个发送)
// 提供 Send 发送数据方法(与 ProcessSender 一致)
// 提供 Ch 获取收取数据的channel变量
// 与上面的 ProcessSender 的区别在于接收端可支持多个channel,可根据自身需要设置策略来动态分配对应的channel
type ProcessMultiSender[T interface{}] struct {
chs []chan T
chNum int
strategy func(T) int
once sync.Once
}
func (a *ProcessMultiSender[T]) Send(items ...T) {
a.once.Do(a.initCh)
for _, v := range items {
var tmp = v
//按策略
ch := a.ChByData(tmp)
ch <- tmp
}
}
// SetStrategy 设置分发策略
func (a *ProcessMultiSender[T]) SetStrategy(f func(T) int) {
a.strategy = f
}
// ChIndex 设置分发策略
func (a *ProcessMultiSender[T]) ChIndex(d T) int {
if a.strategy != nil {
return a.strategy(d)
}
return utils.TrueScopeRand(0, a.chNum)
}
func (a *ProcessMultiSender[T]) Ch(index int) chan T {
a.once.Do(a.initCh)
index = index % a.chNum //防超容
return a.chs[index]
}
func (a *ProcessMultiSender[T]) ChByData(d T) chan T {
return a.Ch(a.ChIndex(d))
}
func (a *ProcessMultiSender[T]) initCh() {
for i := 1; i <= a.chNum; i++ {
a.chs[i-1] = make(chan T, 128)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。