Fetch the repository succeeded.
package storage
import (
"bytes"
bolt "gitee.com/wanttobeamaster/bbolt"
"gitee.com/wanttobeamaster/gridbase/pkg/util"
"math/rand"
"time"
)
type SetObj struct {
Size uint64
}
func MarshalSetObj(obj *SetObj) []byte {
totalLen := 8
raw := make([]byte, totalLen)
idx := 0
util.Uint64ToBytes1(raw[idx:], obj.Size)
return raw
}
func UnmarshalSetObj(raw []byte) (*SetObj, error) {
if len(raw) != 8 {
return nil, nil
}
obj := SetObj{}
idx := 0
obj.Size, _ = util.BytesToUint64(raw[idx:])
return &obj, nil
}
func (e *boltSetEngine) SetMetaObj(key []byte) (*SetObj, bool, error) {
var (
v []byte
err error
)
metaKey := RawKeyPrefix(key)
err = e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltSetBucket"))
v = b.Get(metaKey)
return nil
})
if err != nil {
return nil, false, err
}
if v == nil {
return nil, false, nil
}
obj, err := UnmarshalSetObj(v)
if err != nil {
return nil, false, nil
}
return obj, false, nil
}
func (e *boltSetEngine) newSetMetaObj() *SetObj {
return &SetObj{
Size: 0,
}
}
func (e *boltSetEngine) RawSetDataKey(key, member []byte) []byte {
keyPrefix := RawKeyPrefix(key)
dataKey := append(keyPrefix, util.DataTypeKey)
dataKey = append(dataKey, member...)
return dataKey
}
type boltSetEngine struct {
db *bolt.DB
limiter util.Limiter
}
func newBoltSetEngine(db *bolt.DB) SetEngine {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("boltSetBucket"))
if err != nil {
return err
}
return nil
})
return &boltSetEngine{
db: db,
limiter: *util.NewLimiter(LimitConcurrencyWrite),
}
}
//SAdd adds members into specified Set
func (e *boltSetEngine) SAdd(key []byte, members ...[]byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
metaObj, _, err := e.SetMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
metaObj = e.newSetMetaObj()
}
var added uint64
batchErr := e.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltSetBucket"))
for _, member := range members {
eDataKey := e.RawSetDataKey(key, member)
v := b.Get(eDataKey)
if v != nil {
//already exists
} else {
added++
err := b.Put(eDataKey, []byte{0})
if err != nil {
return err
}
}
}
metaObj.Size += added
//update meta
eMetaKey := RawKeyPrefix(key)
eMetaValue := MarshalSetObj(metaObj)
err := b.Put(eMetaKey, eMetaValue)
if err != nil {
return err
}
return nil
})
return int64(added), batchErr
}
//SRem removes members from specified Set
func (e *boltSetEngine) SRem(key []byte, members ...[]byte) (int64, error) {
if len(key) == 0 || len(members) == 0 {
return 0, ErrEmptyKey
}
var removed uint64
metaObj, _, err := e.SetMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
metaObj = e.newSetMetaObj()
}
batchErr := e.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltSetBucket"))
for _, member := range members {
eDataKey := e.RawSetDataKey(key, member)
v := b.Get(eDataKey)
if v == nil {
//already exists
} else {
removed++
err := b.Delete(eDataKey)
if err != nil {
return err
}
}
}
if removed > 0 {
eMetaKey := RawKeyPrefix(key)
//update meta
metaObj.Size -= removed
//update meta
if metaObj.Size > 0 {
eMetaValue := MarshalSetObj(metaObj)
err := b.Put(eMetaKey, eMetaValue)
if err != nil {
return err
}
} else {
//Set.Size == 0, delete meta
err := b.Delete(eMetaKey)
if err != nil {
return err
}
}
}
return nil
})
return int64(removed), batchErr
}
func (e *boltSetEngine) SCard(key []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
metaObj, _, err := e.SetMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
metaObj = e.newSetMetaObj()
}
return int64(metaObj.Size), err
}
func (e *boltSetEngine) SMembers(key []byte) ([][]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
var members [][]byte
metaObj, _, err := e.SetMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
metaObj = e.newSetMetaObj()
}
startKey := e.RawSetDataKey(key, nil)
endKey := KVPrefixNext(startKey)
metaKeyLen := len(RawKeyPrefix(key))
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltSetBucket"))
c := b.Cursor()
for k, _ := c.Seek(startKey); k != nil && bytes.Compare(k, endKey) <= 0; k, _ = c.Next() {
members = append(members, k[metaKeyLen+1:])
}
return nil
})
return members, nil
}
func (e *boltSetEngine) SIsMember(key []byte, member []byte) (int64, error) {
if len(key) == 0 || len(member) == 0 {
return 0, ErrEmptyKey
}
var err error
metaObj, _, err := e.SetMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
eDataKey := e.RawSetDataKey(key, member)
var v []byte
err = e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltSetBucket"))
v = b.Get(eDataKey)
return nil
})
if err != nil {
return 0, err
}
if v == nil {
return 0, nil
}
return 1, nil
}
func (e *boltSetEngine) SPop(key []byte) ([]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
metaObj, _, err := e.SetMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
if metaObj.Size == 0 {
return nil, nil
}
eDataKeyPrefix := e.RawSetDataKey(key, nil)
rand.Seed(time.Now().UnixNano())
deleteCnt := rand.Intn(int(metaObj.Size))
var deleteKey []byte
updateErr := e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltSetBucket"))
c := b.Cursor()
deleteKey, _ = c.Seek(eDataKeyPrefix)
for i := deleteCnt; i > 0; i-- {
deleteKey, _ = c.Next()
}
err := b.Delete(deleteKey)
if err != nil {
return err
}
eMetaKey := RawKeyPrefix(key)
//update meta
metaObj.Size--
//update meta
if metaObj.Size > 0 {
eMetaValue := MarshalSetObj(metaObj)
err := b.Put(eMetaKey, eMetaValue)
if err != nil {
return err
}
} else {
//Set.Size == 0, delete meta
err := b.Delete(eMetaKey)
if err != nil {
return err
}
}
return nil
})
if updateErr != nil {
return nil, updateErr
}
//return member, nil
return deleteKey[len(eDataKeyPrefix):], nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。