1 Star 1 Fork 0

D10.天地弦/gobase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
queuewriter.go 2.91 KB
一键复制 编辑 原始数据 按行查看 历史
D10.天地弦 提交于 2024-04-01 16:05 . + quewriter
package quewriter
import (
"errors"
"fmt"
"gitee.com/ymofen/gobase"
"io"
"runtime"
"sync"
"sync/atomic"
"time"
)
// 队列写入, 启用go 进行写入
type QueueWriter struct {
closedflag int32
maxcachel int32
copybuf int8
timeout time.Duration
wg sync.WaitGroup
w io.Writer
obound chan []byte
OnAfterWrite func(p []byte, n int, err error)
}
var (
queueCacheBufSize int32
queueWriterAliveN int32
queueRunN int32
)
type WriteWrapper struct {
writefunc func(buf []byte) (n int, err error)
}
func GetQueueWriterStatus() string {
return fmt.Sprintf("alive-n:%d, run:%d, cachebuf:%d", atomic.LoadInt32(&queueWriterAliveN), atomic.LoadInt32(&queueRunN), atomic.LoadInt32(&queueCacheBufSize))
}
func NewWriteWrapper(fn func(buf []byte) (n int, err error)) *WriteWrapper {
return &WriteWrapper{writefunc: fn}
}
func (this *WriteWrapper) Write(buf []byte) (n int, err error) {
return this.writefunc(buf)
}
// maxcachel: 最大允许缓存长度, 0:不限制, 压入时会进行阻塞
func NewQueueWriter(w io.Writer, maxcachel int32) *QueueWriter {
que := &QueueWriter{w: w,
obound: make(chan []byte, maxcachel),
maxcachel: maxcachel,
timeout: time.Second,
}
atomic.AddInt32(&queueWriterAliveN, 1)
runtime.SetFinalizer(que, func(obj interface{}) {
atomic.AddInt32(&queueWriterAliveN, -1)
})
que.copybuf = 1
que.wg.Add(1)
go que.start()
return que
}
func (this *QueueWriter) RequestDis(reason string) {
this.Close()
}
func (this *QueueWriter) SetCopyBuf(flag int8) {
this.copybuf = flag
}
func (this *QueueWriter) Close() error {
if atomic.CompareAndSwapInt32(&this.closedflag, 0, 1) {
select {
case this.obound <- nil:
return nil
case <-time.After(this.timeout):
return errors.New("push to chan timeout!")
}
}
return nil
}
func (this *QueueWriter) Wait() {
this.wg.Wait()
}
func (this *QueueWriter) start() {
if gobase.GoFunCatchException {
defer gobase.DeferCatchPanic()
}
atomic.AddInt32(&queueRunN, 1)
defer atomic.AddInt32(&queueRunN, -1)
defer this.wg.Done()
breakfor:
for {
select {
case buf := <-this.obound:
if buf == nil {
break breakfor
}
atomic.AddInt32(&queueCacheBufSize, -int32(len(buf)))
if atomic.LoadInt32(&this.closedflag) == 0 {
n, err := this.w.Write(buf)
evt := this.OnAfterWrite
if evt != nil {
evt(buf, n, err)
}
}
case <-time.After(this.timeout):
if atomic.LoadInt32(&this.closedflag) == 1 {
break breakfor
}
}
}
this.w = nil
}
func (this *QueueWriter) Write(buf []byte) (n int, err error) {
if len(buf) == 0 {
return 0, nil
}
if atomic.LoadInt32(&this.closedflag) == 1 {
return -1, io.ErrClosedPipe
}
newBuf := gobase.CloneBytes(buf, 0, 0)
n = len(newBuf)
select {
case this.obound <- newBuf:
atomic.AddInt32(&queueCacheBufSize, int32(n))
return n, nil
case <-time.After(this.timeout):
return -1, errors.New("QueueWriter:push to chan timeout!")
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ymofen/gobase.git
git@gitee.com:ymofen/gobase.git
ymofen
gobase
gobase
v1.2.24053

搜索帮助

D67c1975 1850385 1daf7b77 1850385