1 Star 0 Fork 9

qaz9877/rboot

forked from Gitee 极速下载/rboot 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
worker.go 2.50 KB
一键复制 编辑 原始数据 按行查看 历史
G.H 提交于 2020-09-16 17:45 . fix
package rboot
var (
defaultMaxQueue = 20000
)
// Job 表示要运行的Job的接口
type Job interface {
Do()
}
// JobQueue Job 通道
var JobQueue chan Job
// Worker 表示执行该Job的worker
type Worker struct {
// WorkerPool 是个指向全局唯一的 chan 的引用,
// 负责传递 Worker 接收 Job 的 chan。
// Worker 空闲时,将自己的 JobChannel 放入 WorkerPool 中。
// Dispatcher 收到新的 Job 时,从 JobChannel 中取出一个 chan, 并将 Job
// 放入其中,此时 Worker 将从 Chan 中接收到 Job,并进行处理
WorkerPool chan chan Job
// Worker 用于接收 Job 的 chan
JobChannel chan Job
// 用于给 Worker 发送控制命令的 chan,用于停止 chan
Quit chan bool
}
func NewWorker(workPool chan chan Job) Worker {
return Worker{
WorkerPool: workPool,
JobChannel: make(chan Job),
Quit: make(chan bool),
}
}
// Start 为worker启动运行循环,侦听退出通道
func (w Worker) Start() {
go func() {
for {
// 将当前worker注册到worker队列中
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 激活 Job
job.Do()
case <-w.Quit:
// 收到停止信号
return
}
}
}()
}
func (w Worker) Stop() {
go func() {
w.Quit <- true
}()
}
// Dispatcher 调度器
type Dispatcher struct {
// 工作池容量
MaxWorkers int
// 向调度程序注册的工作程序通道池
WorkerPool chan chan Job
Quit chan bool
}
// NewDispatcher 新建一个调度器
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{MaxWorkers: maxWorkers, WorkerPool: pool, Quit: make(chan bool)}
}
// Run 运行调度器
func (d *Dispatcher) Run() {
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.Dispatch()
}
// Stop 调度器停止
func (d *Dispatcher) Stop() {
go func() {
d.Quit <- true
}()
}
// Dispatch 分配任务
func (d *Dispatcher) Dispatch() {
for {
select {
case job := <-JobQueue:
// 已收到Job请求
go func(job Job) {
// 尝试获取可用的worker Job通道
// 阻塞直到worker空闲为止
jobChannel := <-d.WorkerPool
// 将Job分配到worker
jobChannel <- job
}(job)
case <-d.Quit:
return
}
}
}
/*func init() {
maxQueue, err := strconv.Atoi(os.Getenv("MAX_JOB_QUEUE"))
if err != nil || maxQueue <= 0 {
maxQueue = defaultMaxQueue
}
JobQueue = make(chan Job, maxQueue)
dispatcher := NewDispatcher(runtime.NumCPU())
dispatcher.Run()
}*/
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/qaz9877/rboot.git
git@gitee.com:qaz9877/rboot.git
qaz9877
rboot
rboot
master

搜索帮助

D67c1975 1850385 1daf7b77 1850385