Ai
1 Star 0 Fork 0

码农兴哥/go-demo-2025

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
main4.go 4.00 KB
一键复制 编辑 原始数据 按行查看 历史
renxing 提交于 2025-12-19 16:10 +08:00 . Go语言条件变量sync.Cond用法示例
package main
import (
"fmt"
"sync"
"time"
)
/**
### 工作池(Worker Pool)的实现
**使用场景**:当需要处理大量任务,但希望控制并发度,避免系统过载时,可以使用工作池模式。
**功能描述**:
- 创建工作池,包含固定数量的worker
- 将任务提交到任务队列
- worker从队列获取任务并执行
- 当队列满时,新任务进入等待状态
**注意事项**:
- 需要处理任务队列满和空的情况
- 使用条件变量协调生产者和消费者
- 需要优雅地关闭工作池
*/
// Task 任务定义
type Task struct {
ID int
Payload string
}
// WorkerPool 工作池
type WorkerPool struct {
workers int
taskQueue chan Task
taskCond *sync.Cond
taskMutex sync.Mutex
pending []Task
wg sync.WaitGroup
stop chan struct{}
}
func NewWorkerPool(workers, queueSize int) *WorkerPool {
wp := &WorkerPool{
workers: workers,
taskQueue: make(chan Task, queueSize),
stop: make(chan struct{}),
}
wp.taskCond = sync.NewCond(&wp.taskMutex)
return wp
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
fmt.Printf("Worker %d 启动\n", id)
for {
select {
case task, ok := <-wp.taskQueue:
if !ok {
fmt.Printf("Worker %d 退出\n", id)
return
}
// 处理任务
fmt.Printf("Worker %d 开始处理任务 %d: %s\n",
id, task.ID, task.Payload)
time.Sleep(500 * time.Millisecond) // 模拟处理时间
fmt.Printf("Worker %d 完成处理任务 %d\n", id, task.ID)
// 检查是否有待处理的任务
wp.taskMutex.Lock()
if len(wp.pending) > 0 {
// 将待处理任务加入队列
select {
case wp.taskQueue <- wp.pending[0]:
wp.pending = wp.pending[1:]
wp.taskCond.Signal() // 通知有新空间
default:
// 队列已满,保持pending状态
}
}
wp.taskMutex.Unlock()
case <-wp.stop:
fmt.Printf("Worker %d 收到停止信号\n", id)
return
}
}
}
func (wp *WorkerPool) Submit(task Task) bool {
wp.taskMutex.Lock()
defer wp.taskMutex.Unlock()
// 尝试直接放入队列
select {
case wp.taskQueue <- task:
return true
default:
// 队列已满,放入pending列表
wp.pending = append(wp.pending, task)
fmt.Printf("任务 %d 进入等待队列,当前等待数: %d\n",
task.ID, len(wp.pending))
// 等待队列有空间
for len(wp.pending) > 0 {
select {
case wp.taskQueue <- wp.pending[0]:
wp.pending = wp.pending[1:]
fmt.Printf("等待任务 %d 进入队列成功\n", task.ID)
return true
default:
// 队列仍然满,等待
fmt.Printf("等待队列空间...\n")
wp.taskCond.Wait()
}
}
}
return false
}
func (wp *WorkerPool) Start() {
wp.wg.Add(wp.workers)
for i := 0; i < wp.workers; i++ {
go wp.worker(i)
}
}
func (wp *WorkerPool) Stop() {
close(wp.stop)
close(wp.taskQueue)
wp.wg.Wait()
fmt.Println("工作池已停止")
}
func (wp *WorkerPool) Stats() (queueLen, pendingLen int) {
wp.taskMutex.Lock()
defer wp.taskMutex.Unlock()
return len(wp.taskQueue), len(wp.pending)
}
func workerPoolExample() {
fmt.Println("=== 工作池示例 ===")
pool := NewWorkerPool(3, 5) // 3个worker,队列容量5
pool.Start()
defer pool.Stop()
// 提交任务
var submitWg sync.WaitGroup
for i := 0; i < 20; i++ {
submitWg.Add(1)
go func(id int) {
defer submitWg.Done()
task := Task{
ID: id,
Payload: fmt.Sprintf("任务数据-%d", id),
}
if pool.Submit(task) {
fmt.Printf("任务 %d 提交成功\n", id)
} else {
fmt.Printf("任务 %d 提交失败\n", id)
}
}(i)
time.Sleep(50 * time.Millisecond) // 控制提交速度
}
// 监控状态
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
queueLen, pendingLen := pool.Stats()
fmt.Printf("[监控] 队列长度: %d, 等待任务: %d\n",
queueLen, pendingLen)
}
}()
submitWg.Wait()
fmt.Println("所有任务提交完成")
// 等待任务处理完成
time.Sleep(3 * time.Second)
}
func main() {
workerPoolExample()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/rxbook/go-demo-2025.git
git@gitee.com:rxbook/go-demo-2025.git
rxbook
go-demo-2025
go-demo-2025
master

搜索帮助