1 Star 0 Fork 0

golangx/mq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
dispatcher.go 2.68 KB
一键复制 编辑 原始数据 按行查看 历史
luohancai 提交于 2020-03-25 14:32 +08:00 . init
package mq
import (
"context"
"runtime"
"sort"
"strconv"
"sync"
)
// Dispatcher 调度器
// 添加Job到Job Pool
// 调度Job分配到bucket
// 管理bucket
type Dispatcher struct {
addToBucket chan *JobCard
addToTTRBucket chan *JobCard
bucket []*Bucket
TTRBuckets []*Bucket
closed chan struct{}
wg sync.WaitGroup
}
// NewDispatcher 新建调度器
func NewDispatcher() *Dispatcher {
return &Dispatcher{
addToBucket: make(chan *JobCard),
addToTTRBucket: make(chan *JobCard),
closed: make(chan struct{}),
}
}
// Run 启用调度器
// job调度器,负责bucket分配
func (d *Dispatcher) Run(ctx context.Context) {
defer mq.wg.Done()
mq.wg.Add(1)
if mq.running == 0 {
return
}
if err := d.initBucket(); err != nil {
panic(err)
}
for {
select {
case card := <-d.addToBucket:
if card.delay > 0 {
// bucket.job_number可能会有差误,比如手动删除队列元素(这种情况需要重启服务,才能复位)
sort.Sort(ByNum(d.bucket))
d.bucket[0].recvJob <- card
} else {
// 延迟时间<=0,直接添加到队列(作为普通队列使用)
if err := AddToReadyQueue(card.id); err != nil {
// 添加ready queue失败了,要怎么处理
// log.Error(err)
}
}
case card := <-d.addToTTRBucket:
sort.Sort(ByNum(d.TTRBuckets))
d.TTRBuckets[0].recvJob <- card
case <-ctx.Done():
// log.Info("dispatcher notifies all bucket to close.")
close(d.closed)
d.wg.Wait()
return
}
}
}
// initBucket 初始化bucket
func (d *Dispatcher) initBucket() error {
//NumCPU 返回本地机器的逻辑cpu个数
num := runtime.NumCPU()
for i := 0; i < num; i++ {
b := &Bucket{
ID: strconv.Itoa(i),
JobNum: 0,
recvJob: make(chan *JobCard),
addToReadyQueue: make(chan string),
resetTimerChan: make(chan struct{}),
closed: make(chan struct{}),
}
b.JobNum = GetBucketJobNum(b)
go b.run()
d.bucket = append(d.bucket, b)
}
for i := 0; i < num; i++ {
b := &Bucket{
ID: "TTR:" + string(i+65),
JobNum: 0,
recvJob: make(chan *JobCard),
addToReadyQueue: make(chan string),
resetTimerChan: make(chan struct{}),
closed: make(chan struct{}),
}
b.JobNum = GetBucketJobNum(b)
go b.run()
d.TTRBuckets = append(d.TTRBuckets, b)
}
return nil
}
// AddToJobPool 添加任务到对象池
func (d *Dispatcher) AddToJobPool(j *Job) error {
if err := j.CheckJobData(); err != nil {
return err
}
if err := AddToJobPool(j); err != nil {
return err
}
d.addToBucket <- j.Card()
return nil
}
// GetBuckets GetBuckets
func (d *Dispatcher) GetBuckets() []*Bucket {
return d.bucket
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/golangx/mq.git
git@gitee.com:golangx/mq.git
golangx
mq
mq
101f61101d89

搜索帮助