1 Star 0 Fork 2

何吕 / volantmq

forked from JUMEI_ARCH / volantmq 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
retained.go 1.62 KB
一键复制 编辑 原始数据 按行查看 历史
package boltdb
import (
"sync"
"github.com/VolantMQ/volantmq/persistence"
"github.com/boltdb/bolt"
)
type retained struct {
db *dbStatus
// transactions that are in progress right now
wgTx *sync.WaitGroup
lock *sync.Mutex
}
func (r *retained) Load() ([]persistence.PersistedPacket, error) {
var res []persistence.PersistedPacket
err := r.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketRetained)
return bucket.ForEach(func(k, v []byte) error {
pkt := persistence.PersistedPacket{}
if buck := bucket.Bucket(k); buck != nil {
pkt.Data = buck.Get([]byte("data"))
pkt.ExpireAt = string(buck.Get([]byte("expireAt")))
res = append(res, pkt)
}
return nil
})
})
return res, err
}
// Store
func (r *retained) Store(packets []persistence.PersistedPacket) error {
return r.db.db.Update(func(tx *bolt.Tx) error {
tx.DeleteBucket(bucketRetained) // nolint: errcheck
bucket, err := tx.CreateBucket(bucketRetained)
if err != nil {
return err
}
for _, p := range packets {
id, _ := bucket.NextSequence() // nolint: gas
pack, err := bucket.CreateBucketIfNotExists(itob64(id))
if err != nil {
return err
}
err = pack.Put([]byte("data"), p.Data)
if err != nil {
return err
}
err = pack.Put([]byte("expireAt"), []byte(p.ExpireAt))
if err != nil {
return err
}
}
return nil
})
}
// Wipe
func (r *retained) Wipe() error {
return r.db.db.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket(bucketRetained); err != nil {
return err
}
if _, err := tx.CreateBucket(bucketRetained); err != nil {
return err
}
return nil
})
}
Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.3

搜索帮助