代码拉取完成,页面将自动刷新
package flow
import (
"context"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/system"
)
var defaultSink = &DefaultSink{}
type OnFlowResult func(addr string, res Result)
type Flow struct {
conf Config
ctx context.Context
cancel context.CancelFunc
exit bool
onFlowResult OnFlowResult
sinks map[string]Sink
connector map[string]string
}
func New(ctx context.Context, conf Config) *Flow {
c, cancel := context.WithCancel(ctx)
return &Flow{
conf: conf,
ctx: c,
cancel: cancel,
sinks: make(map[string]Sink),
connector: make(map[string]string),
}
}
// AddSink
// name SinkIn
func (flow *Flow) AddSink(name string, sink Sink) *Flow {
flow.sinks[name] = sink
return flow
}
func (flow *Flow) ConnectSink(in string, out string) *Flow {
flow.connector[in] = out
return flow
}
func (flow *Flow) SetResultFunc(fn OnFlowResult) *Flow {
flow.onFlowResult = fn
return flow
}
func (flow *Flow) Stop() {
flow.exit = true
flow.cancel()
}
func (flow *Flow) Run() {
flow.runSink()
}
func (flow *Flow) getSink(name string) Sink {
if p, ok := flow.sinks[name]; ok {
return p
}
return defaultSink
}
func (flow *Flow) runSink() {
address := flow.conf.Address
if len(address) <= 0 {
address = SinkIn
}
var flowResult = flow.onFlowResult
if flow.onFlowResult == nil {
flowResult = func(addr string, res Result) {
}
}
var inPort *Port = nil
var loop = 0
var count = len(flow.sinks) * 2
for {
outPort, res := flow.getSink(address).Process(flow.ctx, inPort)
if !res.Ok() {
logger.Error("Process: Sink connect, inPort='%+v' address='%s' failure, result= %v", inPort, address, res)
flowResult(address, res)
return
}
addr := flow.nextConnectAddr(outPort.GetAddr())
if len(addr) <= 0 {
flowResult(outPort.GetAddr(), Result{Msg: "Success", Data: res.Data})
break
}
loop++
if loop > count { //防止SINK实现进入deal
flowResult(outPort.GetAddr(), CDeadline)
break
}
if flow.exit {
flowResult(outPort.GetAddr(), Result{Code: -4, Msg: "exit", Data: res.Data})
break
}
inPort = outPort
address = addr
select {
case <-system.Closed():
return
default:
}
}
}
func (flow *Flow) nextConnectAddr(adr string) string {
next, ok := flow.connector[adr]
if ok {
return next
}
return ""
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。