1 Star 0 Fork 0

masaichi/工具包

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
pipeline.go 1.25 KB
一键复制 编辑 原始数据 按行查看 历史
masaichi 提交于 2023-04-25 11:56 +08:00 . 添加todo和修改命名
package pipe
// 流水线
type PipeLineCommonChan chan interface{}
// 生产者,生产数据
type PipeLineProducer func(dataSourceChan PipeLineCommonChan)
// return nil表示这个数据不再进行处理
type PipeLineFuncWorker func(chanItem interface{}) interface{}
// 流水线入口
// todo 每一个处理步骤可以设置处理的worker数量
func PipeLineDo(producer PipeLineProducer, userFns ...PipeLineFuncWorker) PipeLineCommonChan {
dataSource := commonProducer(producer)
for _, f := range userFns {
//这里datasource会更新成下一个chan,传递给下一个函数
dataSource = commonStep(dataSource, f)
}
return dataSource
}
// 处理步骤
func commonStep(ch PipeLineCommonChan, userFunc PipeLineFuncWorker) PipeLineCommonChan {
out := make(PipeLineCommonChan, 0)
go func() {
defer close(out)
for val := range ch {
//这里做用户函数逻辑处理
res := userFunc(val)
if res != nil {
out <- res
}
}
}()
return out
}
// 内部生产者,将原始数据塞入流水线管道,开始进行处理
func commonProducer(userFunc PipeLineProducer) PipeLineCommonChan {
dataSource := make(PipeLineCommonChan, 0)
//开协程进行处理
go func() {
defer close(dataSource)
userFunc(dataSource)
}()
return dataSource
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/masaichi/mastool.git
git@gitee.com:masaichi/mastool.git
masaichi
mastool
工具包
v0.0.4

搜索帮助