1 Star 0 Fork 1

王布衣 / pkg

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
default_reader.go 1.38 KB
一键复制 编辑 原始数据 按行查看 历史
package fastqueue
import (
"io"
"sync/atomic"
)
const (
stateRunning = iota
stateClosed
)
type DefaultReader struct {
state int64
current *Cursor // this reader has processed up to this sequence
written *Cursor // the ring buffer has been written up to this sequence
upstream Barrier // all of the readers have advanced up to this sequence
waiter WaitStrategy
consumer Consumer
}
func NewReader(current, written *Cursor, upstream Barrier, waiter WaitStrategy, consumer Consumer) *DefaultReader {
return &DefaultReader{
state: stateRunning,
current: current,
written: written,
upstream: upstream,
waiter: waiter,
consumer: consumer,
}
}
func (this *DefaultReader) Read() {
var gateCount, idleCount, lower, upper int64
var current = this.current.Load()
for {
lower = current + 1
upper = this.upstream.Load()
if lower <= upper {
this.consumer.Consume(lower, upper)
this.current.Store(upper)
current = upper
} else if upper = this.written.Load(); lower <= upper {
gateCount++
idleCount = 0
this.waiter.Gate(gateCount)
} else if atomic.LoadInt64(&this.state) == stateRunning {
idleCount++
gateCount = 0
this.waiter.Idle(idleCount)
} else {
break
}
}
if closer, ok := this.consumer.(io.Closer); ok {
_ = closer.Close()
}
}
func (this *DefaultReader) Close() error {
atomic.StoreInt64(&this.state, stateClosed)
return nil
}
1
https://gitee.com/quant1x/pkg.git
git@gitee.com:quant1x/pkg.git
quant1x
pkg
pkg
v0.2.7

搜索帮助

53164aa7 5694891 3bd8fe86 5694891