代码拉取完成,页面将自动刷新
// Package gothrottler 用于控制 Goroutine 并发数
package gothrottler
import (
"sync"
"gitee.com/sillyman/simpleUtil/common/empty"
)
// Throttler goroutine 限流器
type Throttler struct {
// workCountCh 当前的工作计数,用于限制并发数量
worksCountCh chan empty.T
// jobsWg 用于等待任务完成
jobsWg sync.WaitGroup
// jobsCh 任务列队
jobsCh chan *worker
// stopSchedulingCh 停止调度
stopSchedulingCh chan empty.T
}
// New 创建一个 goroutine 限流器实例
func New(maxWorks int) *Throttler {
th := &Throttler{
worksCountCh: make(chan empty.T, maxWorks),
jobsWg: sync.WaitGroup{},
jobsCh: make(chan *worker, 1024),
stopSchedulingCh: make(chan empty.T),
}
go th.scheduling()
return th
}
// AddJob 添加一个任务
func (t *Throttler) AddJob(fn func(args ...interface{}), fnArgs ...interface{}) {
select {
case <-t.stopSchedulingCh:
default:
t.jobsWg.Add(1)
t.jobsCh <- &worker{fn, fnArgs}
}
}
// scheduling 读取任务队列,然后执行任务
func (t *Throttler) scheduling() {
for {
select {
case <-t.stopSchedulingCh:
return
case w := <-t.jobsCh:
if w == nil { // channel 已经关闭了
return
}
t.worksCountCh <- empty.V
go func(w *worker) {
defer func() {
<-t.worksCountCh
t.jobsWg.Done()
}()
w.run()
}(w)
}
}
}
// Wait 等待全部任务完成
func (t *Throttler) Wait() {
t.jobsWg.Wait()
t.stopScheduling()
}
// InterruptAndWait 取消还没有执行的任务,清空任务列队,并等待正在执行的任务完成
func (t *Throttler) InterruptAndWait() {
t.stopScheduling()
for { // 读空任务列队
select {
case job := <-t.jobsCh:
if job == nil { // 返回nil表示列队为空
t.jobsWg.Wait()
t.jobsCh = make(chan *worker, 1024)
return
} else {
t.jobsWg.Done()
}
default:
close(t.jobsCh)
}
}
}
// stopScheduling 停止调度
func (t *Throttler) stopScheduling() {
select {
case <-t.stopSchedulingCh:
default:
close(t.stopSchedulingCh)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。