1 Star 0 Fork 0

刘昊/lh-short-video

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
runner.go 2.47 KB
一键复制 编辑 原始数据 按行查看 历史
刘昊 提交于 2020-01-31 17:39 +08:00 . scheduler模块完成
package taskrunner
//runner对象
type Runner struct {
Controller controlChan //Dispatcher和Executor的生产者和消费者互相交互信息
Error controlChan //告知程序是否关闭资源
Data dataChan //真正的交互数据
dataSize int //传输的数据大小
longLived bool //是否是长期存在的资源(不回执行close()回收资源)
Dispatcher fn //分配器(生产者)
Executor fn //执行者(消费者)
}
//创建启动任务,模拟构造函数
func NewRunner(size int, longLived bool, d fn, e fn) *Runner {
return &Runner{
Controller: make(chan string, 1), //要带buffer,否则上来就会阻塞
Error: make(chan string, 1),
Data: make(chan interface{}, size),
longLived: longLived,
dataSize: size,
Dispatcher: d,
Executor: e,
}
}
//开始分配任务(常驻任务),长时间等待Controller channel和Data channel的数据来做处理
func (r *Runner) startDispatch() {
//声明匿名函数
defer func() {
//判断是否是要常驻内存,不需要的话就关闭所有channel
if !r.longLived {
close(r.Controller)
close(r.Data)
close(r.Error)
}
}() //没有这里的()该函数不会自动执行
//死循环不断处理消费者和生产者的channel中的数据
for {
select {
//读取Controller的channel中的数据,判断是生产者还是消费者
case c := <-r.Controller:
//生产者
if c == READY_TO_DISPATCH {
//把传入的数据,放入到生产者的回调函数中,同时判断回调函数的处理结果
err := r.Dispatcher(r.Data)
if err != nil {
//回调函数执行出错,通过传参, 指定关闭
r.Error <- CLOSE
} else {
//通过传参,切换为消费者
r.Controller <- READY_TO_EXECUTE
}
}
//消费者
if c == READY_TO_EXECUTE {
//把传入的数据,放入到消费者的回调函数中,同时判断回调函数的处理结果
err := r.Executor(r.Data)
if err != nil {
//回调函数执行出错,通过传参,指定关闭
r.Error <- CLOSE
} else {
//通过传参,切换为生产者
r.Controller <- READY_TO_DISPATCH
}
}
//读取channel中需要关闭的资源
case e := <-r.Error:
if e == CLOSE {
return
}
default:
}
}
}
//启动生产者和消费者模型
func (r *Runner) StartAll() {
//开启生产者和消费者模型,同时预制状态,否则进程会僵死
r.Controller <- READY_TO_DISPATCH
//启动生产者
r.startDispatch()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/chinaliuhan/lh-short-video.git
git@gitee.com:chinaliuhan/lh-short-video.git
chinaliuhan
lh-short-video
lh-short-video
master

搜索帮助