代码拉取完成,页面将自动刷新
同步操作将从 JUMEI_ARCH/volantmq 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package boltdb
import (
"sync"
"github.com/VolantMQ/volantmq/persistence"
"github.com/boltdb/bolt"
)
type retained struct {
db *dbStatus
// transactions that are in progress right now
wgTx *sync.WaitGroup
lock *sync.Mutex
}
func (r *retained) Load() ([]persistence.PersistedPacket, error) {
var res []persistence.PersistedPacket
err := r.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketRetained)
return bucket.ForEach(func(k, v []byte) error {
pkt := persistence.PersistedPacket{}
if buck := bucket.Bucket(k); buck != nil {
pkt.Data = buck.Get([]byte("data"))
pkt.ExpireAt = string(buck.Get([]byte("expireAt")))
res = append(res, pkt)
}
return nil
})
})
return res, err
}
// Store
func (r *retained) Store(packets []persistence.PersistedPacket) error {
return r.db.db.Update(func(tx *bolt.Tx) error {
tx.DeleteBucket(bucketRetained) // nolint: errcheck
bucket, err := tx.CreateBucket(bucketRetained)
if err != nil {
return err
}
for _, p := range packets {
id, _ := bucket.NextSequence() // nolint: gas
pack, err := bucket.CreateBucketIfNotExists(itob64(id))
if err != nil {
return err
}
err = pack.Put([]byte("data"), p.Data)
if err != nil {
return err
}
err = pack.Put([]byte("expireAt"), []byte(p.ExpireAt))
if err != nil {
return err
}
}
return nil
})
}
// Wipe
func (r *retained) Wipe() error {
return r.db.db.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket(bucketRetained); err != nil {
return err
}
if _, err := tx.CreateBucket(bucketRetained); err != nil {
return err
}
return nil
})
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。