1 Star 0 Fork 0

h79 / goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
flow.go 2.30 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-12-14 21:41 . 协程安全退出
package flow
import (
"context"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/system"
)
var defaultSink = &DefaultSink{}
type OnFlowResult func(addr string, res Result)
type Flow struct {
conf Config
ctx context.Context
cancel context.CancelFunc
exit bool
onFlowResult OnFlowResult
sinks map[string]Sink
connector map[string]string
}
func New(ctx context.Context, conf Config) *Flow {
c, cancel := context.WithCancel(ctx)
return &Flow{
conf: conf,
ctx: c,
cancel: cancel,
sinks: make(map[string]Sink),
connector: make(map[string]string),
}
}
// AddSink
// name SinkIn
func (flow *Flow) AddSink(name string, sink Sink) *Flow {
flow.sinks[name] = sink
return flow
}
func (flow *Flow) ConnectSink(in string, out string) *Flow {
flow.connector[in] = out
return flow
}
func (flow *Flow) SetResultFunc(fn OnFlowResult) *Flow {
flow.onFlowResult = fn
return flow
}
func (flow *Flow) Stop() {
flow.exit = true
flow.cancel()
}
func (flow *Flow) Run() {
flow.runSink()
}
func (flow *Flow) getSink(name string) Sink {
if p, ok := flow.sinks[name]; ok {
return p
}
return defaultSink
}
func (flow *Flow) runSink() {
address := flow.conf.Address
if len(address) <= 0 {
address = SinkIn
}
var flowResult = flow.onFlowResult
if flow.onFlowResult == nil {
flowResult = func(addr string, res Result) {
}
}
var inPort *Port = nil
var loop = 0
var count = len(flow.sinks) * 2
for {
outPort, res := flow.getSink(address).Process(flow.ctx, inPort)
if !res.Ok() {
logger.Error("Process: Sink connect, inPort='%+v' address='%s' failure, result= %v", inPort, address, res)
flowResult(address, res)
return
}
addr := flow.nextConnectAddr(outPort.GetAddr())
if len(addr) <= 0 {
flowResult(outPort.GetAddr(), Result{Msg: "Success", Data: res.Data})
break
}
loop++
if loop > count { //防止SINK实现进入deal
flowResult(outPort.GetAddr(), CDeadline)
break
}
if flow.exit {
flowResult(outPort.GetAddr(), Result{Code: -4, Msg: "exit", Data: res.Data})
break
}
inPort = outPort
address = addr
select {
case <-system.Closed():
return
default:
}
}
}
func (flow *Flow) nextConnectAddr(adr string) string {
next, ok := flow.connector[adr]
if ok {
return next
}
return ""
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.57

搜索帮助

344bd9b3 5694891 D2dac590 5694891