代码拉取完成,页面将自动刷新
package u2
import (
"context"
"fmt"
"sync"
"time"
)
// ForConfig 定义并发控制配置参数和运行时状态
// 注意:公共字段应在创建实例时设置,运行时修改可能不会生效
type ForConfig struct {
// 公共配置参数(创建时设置)
MaxRunSec int `comment:"最大运行时间(秒),0表示无限制"` // 最大运行时间(秒)
LimitTimes int `comment:"总执行次数限制,0表示无限制"` // 总执行次数限制
MaxQPS int `comment:"每秒最大请求数"` // 最大QPS
PrintQPS bool `comment:"是否打印QPS统计信息"` // 是否打印QPS信息
Debug bool `comment:"启用调试模式"` // 调试模式
// 内部运行时状态(非公开字段)
startTime time.Time // 记录启动时间
qpsTicker *time.Ticker // QPS统计计时器
runCount int // 当前秒内的执行计数
initialized bool // 初始化完成标记
uuid string // 实例唯一标识符
stopChan chan struct{} // 控制协程停止的通道
wg sync.WaitGroup // 用于等待所有goroutine完成
mu sync.Mutex // 保护并发访问的互斥锁
tokenBucket chan struct{} // 令牌桶实现QPS限制
totalRuns int // 总执行次数统计
lastRefillTime time.Time // 最后一次令牌补充时间
capacity int // 令牌桶容量(等于MaxQPS)
}
// init 初始化配置参数和运行时组件
// 非线程安全,应在首次使用配置前调用
func (c *ForConfig) init() {
if !c.initialized {
c.initialized = true
if c.MaxQPS <= 0 {
c.MaxQPS = 100
}
c.capacity = c.MaxQPS
c.tokenBucket = make(chan struct{}, c.capacity)
c.stopChan = make(chan struct{})
c.uuid = UUID()
c.qpsTicker = time.NewTicker(time.Second)
// 初始化时设置最后补充时间为1秒前,确保首次可以补充令牌
c.lastRefillTime = time.Now().Add(-time.Second)
// 预先填充令牌桶
for i := 0; i < c.capacity; i++ {
c.tokenBucket <- struct{}{}
}
go c.maintainQPS()
}
}
// maintainQPS 维护QPS统计和打印的协程
// 每秒钟重置运行计数并打印统计信息(如果启用)
// 通过stopChan通道控制协程退出
func (c *ForConfig) maintainQPS() {
defer c.qpsTicker.Stop()
for {
select {
case <-c.qpsTicker.C:
c.mu.Lock()
currentQPS := c.runCount
c.totalRuns += currentQPS
if c.PrintQPS {
fmt.Printf("QPS[%s]: %d/%d Total: %d Runtime: %s\n",
c.uuid[:8], currentQPS, c.MaxQPS, c.totalRuns, time.Since(c.startTime).Round(time.Second))
}
c.runCount = 0
c.mu.Unlock() // 提前释放锁
case <-c.stopChan:
return
}
}
}
// refillTokenBucket 补充令牌到最大容量
// 注意:此方法非线程安全,必须在持有mu锁的情况下调用
// 实现逻辑:根据时间差计算应补充的令牌数量
func (c *ForConfig) refillTokenBucket() {
available := c.capacity - len(c.tokenBucket)
for i := 0; i < available; i++ {
select {
case c.tokenBucket <- struct{}{}:
default:
return // 当桶满时立即停止(理论上不会触发)
}
}
}
// acquireToken 获取执行令牌(线程安全)
// 参数:
//
// ctx - 上下文对象,用于取消令牌获取操作
//
// 返回值:
//
// bool - true表示成功获取令牌,false表示需要停止
func (c *ForConfig) acquireToken(ctx context.Context) bool {
c.mu.Lock()
defer c.mu.Unlock()
// 补充令牌前检查上下文状态
select {
case <-ctx.Done():
return false
default:
}
// 补充令牌到最大容量
c.refillTokenBucket()
select {
case <-c.tokenBucket:
c.runCount++
return c.checkContinueLocked()
default:
return false // 无可用令牌
}
}
// checkContinueLocked 检查是否满足继续执行条件
// 注意:必须在持有mu锁的情况下调用
// 检查条件包括:
// 1. 执行次数是否超过限制
// 2. 是否超过最大运行时间
func (c *ForConfig) checkContinueLocked() bool {
if c.LimitTimes > 0 && c.runCount > c.LimitTimes {
return false
}
if c.MaxRunSec > 0 && time.Since(c.startTime) > time.Duration(c.MaxRunSec)*time.Second {
return false
}
return true
}
// Slice 并发处理切片数据
// 参数:
//
// data - 需要处理的切片数据
// concurrency - 最大并发goroutine数量
// fn - 处理函数,参数为索引和值,返回false表示提前终止
//
// 返回值:
//
// bool - true表示全部完成,false表示被提前终止
//
// 注意:data参数必须是slice类型,否则会panic
func (c *ForConfig) Slice(data interface{}, concurrency int, fn func(int, interface{}) bool) bool {
c.init()
c.startTime = time.Now()
defer close(c.stopChan)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
items := Interface2InterfaceSlice(data)
// 令牌桶
if concurrency <= 0 {
concurrency = 1 // 确保至少有一个并发
}
sem := make(chan struct{}, concurrency)
stop := make(chan struct{})
// 错误处理协程
go func() {
<-stop
cancel()
if c.Debug {
fmt.Println("Processing stopped by user")
}
}()
for i, item := range items {
select {
case sem <- struct{}{}:
c.wg.Add(1)
go func(index int, value interface{}) {
defer func() {
<-sem
c.wg.Done()
if r := recover(); r != nil {
c.mu.Lock()
fmt.Printf("Panic recovered: %v\n", r)
c.mu.Unlock()
stop <- struct{}{}
}
}()
if !c.acquireToken(ctx) {
return
}
if !fn(index, value) {
// 非阻塞方式发送停止信号
stop <- struct{}{}
close(stop)
cancel()
}
}(i, item)
case <-ctx.Done():
return true // 直接返回整个函数
}
}
// 等待所有任务完成
done := make(chan struct{})
go func() {
c.wg.Wait()
close(done)
}()
// 无论是否停止都等待所有任务完成
c.wg.Wait()
// 最后检查停止状态
select {
case <-stop:
return false
default:
return true
}
}
// Num 并发处理数字范围
// 参数:
//
// start - 起始数字(包含)
// end - 结束数字(包含)
// concurrency - 最大并发goroutine数量
// fn - 处理函数,参数为当前数字,返回false表示提前终止
//
// 返回值:
//
// bool - true表示全部完成,false表示被提前终止
func (c *ForConfig) Num(start, end, concurrency int, fn func(int) bool) bool {
var items []interface{}
for i := start; i <= end; i++ {
items = append(items, i)
}
return c.Slice(items, concurrency, func(i int, v interface{}) bool {
if n, ok := v.(int); ok {
return fn(n)
}
return false
})
}
// NewFor 创建并发控制器实例
// 参数:
//
// config - 配置参数,如果为nil则使用默认配置
//
// 返回值:
//
// *ForConfig - 初始化完成的并发控制器实例
//
// 默认配置:
//
// MaxQPS = 100
func NewFor(config *ForConfig) *ForConfig {
if config == nil {
config = &ForConfig{MaxQPS: 100}
}
config.init()
return config
}
// ReleaseToken 将令牌放回令牌桶中
//
// 注意:
// 1. 该方法应在任务执行完成后调用
// 2. 如果令牌桶已满,多余的令牌将被丢弃
// 3. 该方法是线程安全的
func (c *ForConfig) ReleaseToken() {
c.mu.Lock()
defer c.mu.Unlock()
// 检查令牌桶是否已满
if len(c.tokenBucket) < c.capacity {
select {
case c.tokenBucket <- struct{}{}:
default:
// 如果令牌桶已满,则丢弃该令牌
}
}
}
// ForSlice 并发处理切片。
//
// 参数:
// - list: 要处理的切片
// - runningNum: 最大并发数
// - f: 处理函数,返回false表示停止处理
//
// 返回值:
// - bool: 是否成功完成所有处理
func ForSlice(list interface{}, runningNum int, f func(int, interface{}) bool) bool {
config := &ForConfig{MaxQPS: 10000, PrintQPS: false}
return config.Slice(list, runningNum, f)
}
// ForSliceWithQPS 带QPS限制的并发处理。
//
// 参数:
// - list: 要处理的切片
// - runningNum: 最大并发数
// - Max_QPS: 最大QPS限制
// - f: 处理函数,返回false表示停止处理
//
// 返回值:
// - bool: 是否成功完成所有处理
func ForSliceWithQPS(list interface{}, runningNum int, Max_QPS int, f func(int, interface{}) bool) bool {
config := &ForConfig{MaxQPS: Max_QPS, PrintQPS: false}
return config.Slice(list, runningNum, f)
}
// ForWithNum 并发处理数字范围。
//
// 参数:
// - startNum: 起始数字
// - endNum: 结束数字
// - runningNum: 最大并发数
// - f: 处理函数,返回false表示停止处理
//
// 返回值:
// - bool: 是否成功完成所有处理
func ForWithNum(startNum, endNum, runningNum int, f func(int) bool) bool {
config := &ForConfig{MaxQPS: 10000, PrintQPS: false}
return config.Num(startNum, endNum, runningNum, f)
}
// ForWithNumAndQPS 并发处理数字范围,带QPS限制。
//
// 参数:
// - startNum: 起始数字
// - endNum: 结束数字
// - runningNum: 最大并发数
// - Max_QPS: 最大QPS限制
// - f: 处理函数,返回false表示停止处理
//
// 返回值:
// - bool: 是否成功完成所有处理
func ForWithNumAndQPS(startNum, endNum, runningNum, Max_QPS int, f func(int) bool) bool {
config := &ForConfig{MaxQPS: Max_QPS, PrintQPS: false}
return config.Num(startNum, endNum, runningNum, f)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。