代码拉取完成,页面将自动刷新
package main
import (
"fmt"
"sync"
"time"
)
/**
### 生产者-消费者模型(有界缓冲区)
这是条件变量最经典的应用,解决生产者和消费者之间的协调问题。当缓冲区满时,生产者需要等待;当缓冲区空时,消费者需要等待。
**功能描述**:
- 实现一个有固定容量的缓冲区
- 生产者生产数据放入缓冲区
- 消费者从缓冲区取出数据消费
- 使用两个条件变量分别管理缓冲区"非空"和"未满"状态
**注意事项**:
- 使用两个条件变量分别通知生产者和消费者
- 生产者和消费者都使用for循环检查条件
- 在修改共享状态后立即发送信号
*/
// BoundedBuffer 有界缓冲区
type BoundedBuffer struct {
buffer []interface{}
capacity int
mu sync.Mutex
notEmpty *sync.Cond // 缓冲区非空条件
notFull *sync.Cond // 缓冲区未满条件
count int // 当前元素数量
putIndex int // 下一个放入的位置
takeIndex int // 下一个取出的位置
}
func NewBoundedBuffer(capacity int) *BoundedBuffer {
bb := &BoundedBuffer{
buffer: make([]interface{}, capacity),
capacity: capacity,
}
bb.notEmpty = sync.NewCond(&bb.mu)
bb.notFull = sync.NewCond(&bb.mu)
return bb
}
func (bb *BoundedBuffer) Put(item interface{}) {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待缓冲区未满
for bb.count == bb.capacity {
fmt.Printf("生产者: 缓冲区已满(%d/%d),等待...\n",
bb.count, bb.capacity)
bb.notFull.Wait()
}
// 放入数据
bb.buffer[bb.putIndex] = item
bb.putIndex = (bb.putIndex + 1) % bb.capacity
bb.count++
fmt.Printf("生产者: 放入 %v,当前数量: %d/%d\n",
item, bb.count, bb.capacity)
// 通知消费者缓冲区非空
bb.notEmpty.Signal()
}
func (bb *BoundedBuffer) Take() interface{} {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待缓冲区非空
for bb.count == 0 {
fmt.Printf("消费者: 缓冲区为空(0/%d),等待...\n", bb.capacity)
bb.notEmpty.Wait()
}
// 取出数据
item := bb.buffer[bb.takeIndex]
bb.buffer[bb.takeIndex] = nil // 帮助垃圾回收
bb.takeIndex = (bb.takeIndex + 1) % bb.capacity
bb.count--
fmt.Printf("消费者: 取出 %v,当前数量: %d/%d\n",
item, bb.count, bb.capacity)
// 通知生产者缓冲区未满
bb.notFull.Signal()
return item
}
func (bb *BoundedBuffer) Stats() (count, capacity int) {
bb.mu.Lock()
defer bb.mu.Unlock()
return bb.count, bb.capacity
}
func producerConsumerExample() {
fmt.Println("=== 生产者-消费者模型(有界缓冲区)===")
buffer := NewBoundedBuffer(5)
var wg sync.WaitGroup
// 启动生产者
producers := 3
for i := 0; i < producers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 4; j++ {
item := fmt.Sprintf("产品-%d-%d", id, j)
buffer.Put(item)
time.Sleep(time.Duration(id+1) * 100 * time.Millisecond)
}
}(i)
}
// 启动消费者
consumers := 2
for i := 0; i < consumers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 6; j++ {
item := buffer.Take()
fmt.Printf("消费者%d: 处理 %v\n", id, item)
time.Sleep(300 * time.Millisecond)
}
}(i)
}
// 监控缓冲区状态
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
count, capacity := buffer.Stats()
fmt.Printf("[监控] 缓冲区使用率: %d/%d (%.1f%%)\n",
count, capacity, float64(count)/float64(capacity)*100)
}
}()
wg.Wait()
fmt.Println("所有生产消费完成")
}
func main() {
producerConsumerExample()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。