代码拉取完成,页面将自动刷新
package fastqueue
import (
"io"
"sync/atomic"
)
const (
stateRunning = iota
stateClosed
)
type DefaultReader struct {
state int64
current *Cursor // this reader has processed up to this sequence
written *Cursor // the ring buffer has been written up to this sequence
upstream Barrier // all of the readers have advanced up to this sequence
waiter WaitStrategy
consumer Consumer
}
func NewReader(current, written *Cursor, upstream Barrier, waiter WaitStrategy, consumer Consumer) *DefaultReader {
return &DefaultReader{
state: stateRunning,
current: current,
written: written,
upstream: upstream,
waiter: waiter,
consumer: consumer,
}
}
func (this *DefaultReader) Read() {
var gateCount, idleCount, lower, upper int64
var current = this.current.Load()
for {
lower = current + 1
upper = this.upstream.Load()
if lower <= upper {
this.consumer.Consume(lower, upper)
this.current.Store(upper)
current = upper
} else if upper = this.written.Load(); lower <= upper {
gateCount++
idleCount = 0
this.waiter.Gate(gateCount)
} else if atomic.LoadInt64(&this.state) == stateRunning {
idleCount++
gateCount = 0
this.waiter.Idle(idleCount)
} else {
break
}
}
if closer, ok := this.consumer.(io.Closer); ok {
_ = closer.Close()
}
}
func (this *DefaultReader) Close() error {
atomic.StoreInt64(&this.state, stateClosed)
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。