代码拉取完成,页面将自动刷新
package mq
import (
"context"
"runtime"
"sort"
"strconv"
"sync"
)
// Dispatcher 调度器
// 添加Job到Job Pool
// 调度Job分配到bucket
// 管理bucket
type Dispatcher struct {
addToBucket chan *JobCard
addToTTRBucket chan *JobCard
bucket []*Bucket
TTRBuckets []*Bucket
closed chan struct{}
wg sync.WaitGroup
}
// NewDispatcher 新建调度器
func NewDispatcher() *Dispatcher {
return &Dispatcher{
addToBucket: make(chan *JobCard),
addToTTRBucket: make(chan *JobCard),
closed: make(chan struct{}),
}
}
// Run 启用调度器
// job调度器,负责bucket分配
func (d *Dispatcher) Run(ctx context.Context) {
defer mq.wg.Done()
mq.wg.Add(1)
if mq.running == 0 {
return
}
if err := d.initBucket(); err != nil {
panic(err)
}
for {
select {
case card := <-d.addToBucket:
if card.delay > 0 {
// bucket.job_number可能会有差误,比如手动删除队列元素(这种情况需要重启服务,才能复位)
sort.Sort(ByNum(d.bucket))
d.bucket[0].recvJob <- card
} else {
// 延迟时间<=0,直接添加到队列(作为普通队列使用)
if err := AddToReadyQueue(card.id); err != nil {
// 添加ready queue失败了,要怎么处理
// log.Error(err)
}
}
case card := <-d.addToTTRBucket:
sort.Sort(ByNum(d.TTRBuckets))
d.TTRBuckets[0].recvJob <- card
case <-ctx.Done():
// log.Info("dispatcher notifies all bucket to close.")
close(d.closed)
d.wg.Wait()
return
}
}
}
// initBucket 初始化bucket
func (d *Dispatcher) initBucket() error {
//NumCPU 返回本地机器的逻辑cpu个数
num := runtime.NumCPU()
for i := 0; i < num; i++ {
b := &Bucket{
ID: strconv.Itoa(i),
JobNum: 0,
recvJob: make(chan *JobCard),
addToReadyQueue: make(chan string),
resetTimerChan: make(chan struct{}),
closed: make(chan struct{}),
}
b.JobNum = GetBucketJobNum(b)
go b.run()
d.bucket = append(d.bucket, b)
}
for i := 0; i < num; i++ {
b := &Bucket{
ID: "TTR:" + string(i+65),
JobNum: 0,
recvJob: make(chan *JobCard),
addToReadyQueue: make(chan string),
resetTimerChan: make(chan struct{}),
closed: make(chan struct{}),
}
b.JobNum = GetBucketJobNum(b)
go b.run()
d.TTRBuckets = append(d.TTRBuckets, b)
}
return nil
}
// AddToJobPool 添加任务到对象池
func (d *Dispatcher) AddToJobPool(j *Job) error {
if err := j.CheckJobData(); err != nil {
return err
}
if err := AddToJobPool(j); err != nil {
return err
}
d.addToBucket <- j.Card()
return nil
}
// GetBuckets GetBuckets
func (d *Dispatcher) GetBuckets() []*Bucket {
return d.bucket
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。