1 Star 0 Fork 0

CaptialSTeam/ubdframe

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
coprocess_send.go 1.94 KB
一键复制 编辑 原始数据 按行查看 历史
sage 提交于 2025-05-15 13:48 +08:00 . modify coprecess send
package comag
import (
"gitee.com/captials-team/ubdframe/src/common/utils"
"sync"
)
func NewProcessSender[T interface{}](l ...int) *ProcessSender[T] {
tmp := 128
if len(l) > 0 {
tmp = l[0]
}
return &ProcessSender[T]{
ch: make(chan T, tmp),
}
}
// ProcessSender 处理发送器
// 提供 Send 发送数据方法
// 提供 Ch 获取收取数据的channel变量
type ProcessSender[T interface{}] struct {
ch chan T
}
func (a *ProcessSender[T]) Send(items ...T) {
for _, v := range items {
var tmp = v
a.ch <- tmp
}
}
func (a *ProcessSender[T]) Ch() chan T {
return a.ch
}
func NewProcessMultiSender[T interface{}](num int) *ProcessMultiSender[T] {
return &ProcessMultiSender[T]{
chs: make([]chan T, num),
chNum: num,
}
}
// ProcessMultiSender 处理发送器(提供多个发送)
// 提供 Send 发送数据方法(与 ProcessSender 一致)
// 提供 Ch 获取收取数据的channel变量
// 与上面的 ProcessSender 的区别在于接收端可支持多个channel,可根据自身需要设置策略来动态分配对应的channel
type ProcessMultiSender[T interface{}] struct {
chs []chan T
chNum int
strategy func(T) int
once sync.Once
}
func (a *ProcessMultiSender[T]) Send(items ...T) {
a.once.Do(a.initCh)
for _, v := range items {
var tmp = v
//按策略
ch := a.ChByData(tmp)
ch <- tmp
}
}
// SetStrategy 设置分发策略
func (a *ProcessMultiSender[T]) SetStrategy(f func(T) int) {
a.strategy = f
}
// ChIndex 设置分发策略
func (a *ProcessMultiSender[T]) ChIndex(d T) int {
if a.strategy != nil {
return a.strategy(d)
}
return utils.TrueScopeRand(0, a.chNum)
}
func (a *ProcessMultiSender[T]) Ch(index int) chan T {
a.once.Do(a.initCh)
index = index % a.chNum //防超容
return a.chs[index]
}
func (a *ProcessMultiSender[T]) ChByData(d T) chan T {
return a.Ch(a.ChIndex(d))
}
func (a *ProcessMultiSender[T]) initCh() {
for i := 1; i <= a.chNum; i++ {
a.chs[i-1] = make(chan T, 128)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/captials-team/ubdframe.git
git@gitee.com:captials-team/ubdframe.git
captials-team
ubdframe
ubdframe
v1.0.3

搜索帮助