1 Star 0 Fork 0

c./goframe

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
grpool_pool.go 3.63 KB
一键复制 编辑 原始数据 按行查看 历史
admin 提交于 2024-11-05 01:30 . 2024-11-5 替换中文提示完成.
// 版权归GoFrame作者(https://goframe.org)所有。保留所有权利。
//
// 本源代码形式受MIT许可证条款约束。
// 如果未随本文件一同分发MIT许可证副本,
// 您可以在https://github.com/gogf/gf处获取。
// md5:a9832f33b234e3f3
package 协程池类
import (
"context"
gcode "gitee.com/go_888/goframe/errors/gcode"
gerror "gitee.com/go_888/goframe/errors/gerror"
)
// Add 将一个新任务添加到池中。
// 该任务将会异步执行。
// md5:69389d53e280086b
func (p *X结构_协程池) Add(ctx context.Context, f X函数类型_Func) error {
for p.closed.X取值() {
return gerror.X创建错误码(
gcode.X变量_CodeInvalidOperation,
"goroutine 默认池已关闭",
)
}
p.list.X加入前面(&localPoolItem{
Ctx: ctx,
Func: f,
})
// 检查并 fork 新的 worker。 md5:d3acb042c3373fa4
p.checkAndForkNewGoroutineWorker()
return nil
}
// AddWithRecover 将指定的恢复函数推送到队列中执行新任务。
//
// 可选的 `recoverFunc` 在执行 `userFunc` 时发生任何 panic 时被调用。如果未传递或给定 nil,它将忽略来自 `userFunc` 的 panic。任务将异步执行。
// md5:764d1260466b9a5d
func (p *X结构_协程池) AddWithRecover(ctx context.Context, userFunc X函数类型_Func, recoverFunc X函数类型_RecoverFunc) error {
return p.Add(ctx, func(ctx context.Context) {
defer func() {
if exception := recover(); exception != nil {
if recoverFunc != nil {
if v, ok := exception.(error); ok && gerror.X判断是否带堆栈(v) {
recoverFunc(ctx, v)
} else {
recoverFunc(ctx, gerror.X创建错误码并格式化(gcode.X变量_CodeInternalPanic, "%+v", exception))
}
}
}
}()
userFunc(ctx)
})
}
// Cap 返回池的容量。
// 这个容量在创建池时定义。
// 如果没有限制,则返回-1。
// md5:1c6cae16429df1b2
func (p *X结构_协程池) Cap() int {
return p.limit
}
// Size 返回当前池中的goroutine数量。 md5:247eb1685633ccc3
func (p *X结构_协程池) Size() int {
return p.count.X取值()
}
// Jobs 返回池中的当前任务数。
// 注意,它返回的不是工作器/goroutine的数量,而是任务的数量。
// md5:c82d92b33047974c
func (p *X结构_协程池) Jobs() int {
return p.list.X弃用_Len()
}
// IsClosed 返回池是否已关闭。 md5:85755176347bcfea
func (p *X结构_协程池) IsClosed() bool {
return p.closed.X取值()
}
// Close 关闭goroutine池,导致所有goroutines退出。 md5:3d9c73ed9b0f4643
func (p *X结构_协程池) Close() {
p.closed.X设置值(true)
}
// checkAndForkNewGoroutineWorker 检查并创建一个新的goroutine工作进程。
// 请注意,如果工作函数出现恐慌且该工作没有恢复处理,那么该工作进程将会死亡。
// md5:242a912451066181
func (p *X结构_协程池) checkAndForkNewGoroutineWorker() {
// 检查是否需要在新的goroutine中 fork。 md5:20ef20b082ef0b86
var n int
for {
n = p.count.X取值()
if p.limit != -1 && n >= p.limit {
// 不需要启动新的goroutine。 md5:a4d7257aa086311e
return
}
if p.count.X比较并交换(n, n+1) {
// 使用CAS(比较并交换)来保证操作的原子性。 md5:2337a31243acf132
break
}
}
// 在goroutine中创建任务函数。 md5:e0e70df051fd0b1a
go func() {
defer p.count.X追加(-1)
var (
listItem interface{}
poolItem *localPoolItem
)
// 哈丁工作,一个接一个,任务永无止境,工人永不消亡。 md5:625670ae6a926602
for !p.closed.X取值() {
listItem = p.list.X取出后面()
if listItem == nil {
return
}
poolItem = listItem.(*localPoolItem)
poolItem.Func(poolItem.Ctx)
}
}()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/go_888/goframe.git
git@gitee.com:go_888/goframe.git
go_888
goframe
goframe
782a3f7170cf

搜索帮助