1 Star 0 Fork 1

SillyMan/Go实用工具包

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
gothrottler.go 2.02 KB
一键复制 编辑 原始数据 按行查看 历史
SillyMan 提交于 2022-06-23 11:16 +08:00 . 清理标记为过去的ioutil方法
// Package gothrottler 用于控制 Goroutine 并发数
package gothrottler
import (
"sync"
"gitee.com/sillyman/simpleUtil/common/empty"
)
// Throttler goroutine 限流器
type Throttler struct {
// workCountCh 当前的工作计数,用于限制并发数量
worksCountCh chan empty.T
// jobsWg 用于等待任务完成
jobsWg sync.WaitGroup
// jobsCh 任务列队
jobsCh chan *worker
// stopSchedulingCh 停止调度
stopSchedulingCh chan empty.T
}
// New 创建一个 goroutine 限流器实例
func New(maxWorks int) *Throttler {
th := &Throttler{
worksCountCh: make(chan empty.T, maxWorks),
jobsWg: sync.WaitGroup{},
jobsCh: make(chan *worker, 1024),
stopSchedulingCh: make(chan empty.T),
}
go th.scheduling()
return th
}
// AddJob 添加一个任务
func (t *Throttler) AddJob(fn func(args ...interface{}), fnArgs ...interface{}) {
select {
case <-t.stopSchedulingCh:
default:
t.jobsWg.Add(1)
t.jobsCh <- &worker{fn, fnArgs}
}
}
// scheduling 读取任务队列,然后执行任务
func (t *Throttler) scheduling() {
for {
select {
case <-t.stopSchedulingCh:
return
case w := <-t.jobsCh:
if w == nil { // channel 已经关闭了
return
}
t.worksCountCh <- empty.V
go func(w *worker) {
defer func() {
<-t.worksCountCh
t.jobsWg.Done()
}()
w.run()
}(w)
}
}
}
// Wait 等待全部任务完成
func (t *Throttler) Wait() {
t.jobsWg.Wait()
t.stopScheduling()
}
// InterruptAndWait 取消还没有执行的任务,清空任务列队,并等待正在执行的任务完成
func (t *Throttler) InterruptAndWait() {
t.stopScheduling()
for { // 读空任务列队
select {
case job := <-t.jobsCh:
if job == nil { // 返回nil表示列队为空
t.jobsWg.Wait()
t.jobsCh = make(chan *worker, 1024)
return
} else {
t.jobsWg.Done()
}
default:
close(t.jobsCh)
}
}
}
// stopScheduling 停止调度
func (t *Throttler) stopScheduling() {
select {
case <-t.stopSchedulingCh:
default:
close(t.stopSchedulingCh)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/sillyman/simpleUtil.git
git@gitee.com:sillyman/simpleUtil.git
sillyman
simpleUtil
Go实用工具包
5c98b36afa10

搜索帮助