代码拉取完成,页面将自动刷新
package pool
import (
"errors"
"sync"
"sync/atomic"
"time"
)
var (
errPoolClosed = errors.New("pool is closed")
)
// 定义任务的数据类型
type goTask func()
// goWorker 定义协程池工人的数据结构
type goWorker struct {
Id int
Pool *goPool // 关联协程池
TaskChan chan goTask // 任务通道
QuitChan chan struct{} // 关闭通道
lastActiveAt time.Time // 最后活动时间
}
// NewWorker 新建一个工人
func NewWorker(id int, pool *goPool) *goWorker {
return &goWorker{
Id: id,
Pool: pool,
TaskChan: make(chan goTask),
QuitChan: make(chan struct{}),
lastActiveAt: time.Now(),
}
}
// Start 启动工人,监听任务队列
func (own *goWorker) Start() {
go func() {
defer own.Pool.removeWorker(own) // 清出前,从协程池的工人列表中移除
for {
select {
// 闲置当前工人
case own.Pool.idleWorkerChan <- own:
// 清退当前工人
case <-own.QuitChan:
return
// 关闭协程池,清退所有工人
case <-own.Pool.stopChan:
return
}
select {
// 等待任务下发
case task := <-own.TaskChan:
// 执行任务
task()
own.Pool.wg.Done()
own.lastActiveAt = time.Now() // 计算空闲时间,要从执行完任务后开始计时
// 清退当前工人
case <-own.QuitChan:
return
// 关闭协程池,清退所有工人
case <-own.Pool.stopChan:
return
// 暂时没有任务,且空闲超时
case <-time.After(own.Pool.idleWorkerTimeout):
// 检查是否需要清退该工人
if own.Pool.shouldIdleWorkerExit(own) {
return
}
}
}
}()
}
// Stop 清退工人
func (own *goWorker) Stop() {
close(own.QuitChan)
}
// goPool 定义协程池的数据结构
type goPool struct {
minWorkerNumber int // 最小工人数量
maxWorkerNumber int // 最大工人数量
idleWorkerTimeout time.Duration // 闲置工人超时时间
idleWorkerChan chan *goWorker // 空闲工人通道
taskChan chan goTask // 任务通道
workers []*goWorker // 工人列表
workersLock sync.RWMutex // 工人列表安全锁
workersCount int32 // 工人原子计数器
closed *atomic.Bool // 原子操作标记池是否已关闭
stopChan chan struct{} // 停止信号
wg sync.WaitGroup
}
// NewGoPool 新建一个协程池
func NewGoPool(maxNumber, minNumber int, idleTimeout time.Duration) *goPool {
closed := &atomic.Bool{}
closed.Store(false)
pool := &goPool{
maxWorkerNumber: maxNumber,
minWorkerNumber: minNumber,
idleWorkerTimeout: idleTimeout,
idleWorkerChan: make(chan *goWorker, maxNumber),
taskChan: make(chan goTask, 1024),
workers: make([]*goWorker, 0, maxNumber),
closed: closed,
stopChan: make(chan struct{}),
}
// 初始化最小工人数(事业编)
for i := 0; i < minNumber; i++ {
pool.addWorker(i)
}
return pool
}
// addWorker 添加新工人
func (own *goPool) addWorker(id int) {
own.workersLock.Lock()
// 新建一个工人
worker := NewWorker(id, own)
// 加入工人列表
own.workers = append(own.workers, worker)
// 工人数量原子+1
atomic.AddInt32(&own.workersCount, 1)
// 启动工人
worker.Start()
own.workersLock.Unlock()
}
// removeWorker 从列表中移除工人
func (own *goPool) removeWorker(worker *goWorker) {
own.workersLock.Lock()
for i, w := range own.workers {
// 工人存在,高效移除元素
if w == worker {
own.workers[i] = own.workers[len(own.workers)-1]
own.workers = own.workers[:len(own.workers)-1]
atomic.AddInt32(&own.workersCount, -1)
break
}
}
own.workersLock.Unlock()
}
// shouldIdleWorkerExit 检查是否应该清退空闲工人
func (own *goPool) shouldIdleWorkerExit(worker *goWorker) bool {
// 事业编不动
if 0 <= worker.Id && worker.Id < own.minWorkerNumber {
return false
}
// 当前工人数超过最小值,则清退
return atomic.LoadInt32(&own.workersCount) > int32(own.minWorkerNumber)
}
// Start 启动协程池
func (own *goPool) Start() {
go own.dispatch()
go own.monitor()
}
// dispatch 分发任务
func (own *goPool) dispatch() {
for {
select {
// 监听任务通道
case task, ok := <-own.taskChan:
// 任务通道关闭
if !ok {
return
}
select {
// 等待空闲工人(阻塞)
case worker := <-own.idleWorkerChan:
// 分发当前任务到工人的任务通道
worker.TaskChan <- task
// 协程池关闭,直接退出
case <-own.stopChan:
return
}
// 协程池关闭,直接退出
case <-own.stopChan:
return
}
}
}
// monitor 监控状态
func (own *goPool) monitor() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
own.adjustWorkers()
case <-own.stopChan:
return
}
}
}
// adjustWorkers 动态调整工人数量(动态缩容)
func (own *goPool) adjustWorkers() {
currentWorkers := atomic.LoadInt32(&own.workersCount)
// 检查是否需要扩容
if len(own.taskChan) > cap(own.taskChan)/2 && currentWorkers < int32(own.maxWorkerNumber) {
// 创建新工人
own.addWorker(int(currentWorkers))
}
// 统计空闲工人
idleCount := len(own.idleWorkerChan)
// 如果空闲工人过多且超过最小值,回收部分
if idleCount > own.minWorkerNumber && currentWorkers > int32(own.minWorkerNumber) {
// 尝试回收最多 10% 的空闲工人
toRemove := idleCount / 10
if toRemove < 1 {
toRemove = 1
}
for i := 0; i < toRemove; i++ {
select {
case worker := <-own.idleWorkerChan:
// 清退扩容工人,事业编不动
if worker.Id >= own.minWorkerNumber {
worker.Stop()
continue
}
// 事业编放回空闲队列
own.idleWorkerChan <- worker
default:
return
}
}
}
}
/*
Go 异步执行任务
入参:
- task: 任务函数
- block: 是否阻塞。若任务通道已满,block=true,则会阻塞等到任务开始执行;block=false,则会直接返回失败
*/
func (own *goPool) Go(task func(), block bool) error {
// 检查协程池是否已关闭
if own.closed.Load() {
return errPoolClosed
}
own.wg.Add(1)
select {
// 提交任务到协程池的任务通道
case own.taskChan <- task:
// 协程池任务通道已满,则动态扩容
default:
if atomic.LoadInt32(&own.workersCount) < int32(own.maxWorkerNumber) {
own.workersLock.Lock()
worker := NewWorker(int(own.workersCount), own)
own.workers = append(own.workers, worker)
atomic.AddInt32(&own.workersCount, 1)
own.workersLock.Unlock()
worker.Start()
// 直接分配任务
select {
case worker.TaskChan <- task:
default:
go func() {
select {
case worker.TaskChan <- task:
case <-worker.QuitChan:
case <-own.stopChan:
}
}()
}
return nil
}
// 达到最大工人数,阻塞提交
own.taskChan <- task
}
return nil
}
// Wait 等待所有任务完成
func (own *goPool) Wait() {
own.wg.Wait()
}
// Close 关闭协程池
func (own *goPool) Close() error {
if !own.closed.CompareAndSwap(false, true) {
return nil
}
close(own.stopChan) // 通知所有goroutine停止
close(own.taskChan) // 关闭任务队列(不再接受新任务)
// 停止所有工人
for _, worker := range own.workers {
worker.Stop()
}
close(own.idleWorkerChan) // 关闭空闲工人通道
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。