代码拉取完成,页面将自动刷新
package pipe
// 流水线
type PipeLineCommonChan chan interface{}
// 生产者,生产数据
type PipeLineProducer func(dataSourceChan PipeLineCommonChan)
// return nil表示这个数据不再进行处理
type PipeLineFuncWorker func(chanItem interface{}) interface{}
// 流水线入口
// todo 每一个处理步骤可以设置处理的worker数量
func PipeLineDo(producer PipeLineProducer, userFns ...PipeLineFuncWorker) PipeLineCommonChan {
dataSource := commonProducer(producer)
for _, f := range userFns {
//这里datasource会更新成下一个chan,传递给下一个函数
dataSource = commonStep(dataSource, f)
}
return dataSource
}
// 处理步骤
func commonStep(ch PipeLineCommonChan, userFunc PipeLineFuncWorker) PipeLineCommonChan {
out := make(PipeLineCommonChan, 0)
go func() {
defer close(out)
for val := range ch {
//这里做用户函数逻辑处理
res := userFunc(val)
if res != nil {
out <- res
}
}
}()
return out
}
// 内部生产者,将原始数据塞入流水线管道,开始进行处理
func commonProducer(userFunc PipeLineProducer) PipeLineCommonChan {
dataSource := make(PipeLineCommonChan, 0)
//开协程进行处理
go func() {
defer close(dataSource)
userFunc(dataSource)
}()
return dataSource
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。