代码拉取完成,页面将自动刷新
package storage
import (
"bytes"
bolt "gitee.com/wanttobeamaster/bbolt"
"gitee.com/wanttobeamaster/gridbase/pkg/util"
)
const (
LHeadDirection uint8 = 0
LTailDirection uint8 = 1
LItemMinIndex uint64 = 1024
LItemMaxIndex uint64 = 1<<64 - 1024
LItemInitIndex uint64 = 1<<32 - 512
LimitConcurrencyWrite = 50000
)
type ListObj struct {
Head uint64
Tail uint64
Size uint64
}
// MarshalListObj Serialize ListObj func
func MarshalListObj(obj *ListObj) []byte {
totalLen := 8 + 8 + 8
raw := make([]byte, totalLen)
idx := 0
_ = util.Uint64ToBytes1(raw[idx:], obj.Head)
idx += 8
_ = util.Uint64ToBytes1(raw[idx:], obj.Tail)
idx += 8
_ = util.Uint64ToBytes1(raw[idx:], obj.Size)
return raw
}
// UnmarshalListObj Deserialize ListObj func
func UnmarshalListObj(raw []byte) (*ListObj, error) {
if len(raw) != 24 {
return nil, nil
}
obj := ListObj{}
idx := 0
obj.Head, _ = util.BytesToUint64(raw[idx:])
idx += 8
obj.Tail, _ = util.BytesToUint64(raw[idx:])
idx += 8
obj.Size, _ = util.BytesToUint64(raw[idx:])
return &obj, nil
}
type boltListEngine struct {
db *bolt.DB
limiter util.Limiter
}
func newBoltListEngine(db *bolt.DB) ListEngine {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("boltListBucket"))
if err != nil {
return err
}
return nil
})
return &boltListEngine{
db: db,
limiter: *util.NewLimiter(LimitConcurrencyWrite),
}
}
// ListMetaObj generate *ListObj with key
func (e *boltListEngine) ListMetaObj(key []byte) (*ListObj, bool, error) {
var (
v []byte
err error
)
metaKey := RawKeyPrefix(key)
err = e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
v = b.Get(metaKey)
return nil
})
if err != nil {
return nil, false, err
}
if v == nil {
return nil, false, nil
}
obj, err := UnmarshalListObj(v)
if err != nil {
return nil, false, nil
}
return obj, false, nil
}
func(e *boltListEngine) newListMetaObj() *ListObj {
return &ListObj{
Head: LItemInitIndex,
Tail: LItemInitIndex,
Size: 0,
}
}
func (e *boltListEngine) RawListKey(key []byte, idx uint64) []byte {
keyPrefix := RawKeyPrefix(key)
listKey := append(keyPrefix, util.DataTypeKey)
idxBytes, _ := util.Uint64ToBytes(idx)
listKey = append(listKey, idxBytes...)
return listKey
}
func (e *boltListEngine) LIndex(key []byte, index int64) ([]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
var item []byte
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
//check whether index in [head, tail] of list
//index can be negative!
if index >= 0 {
if index >= int64(metaObj.Size) {
return nil, nil
}
} else {
if -index > int64(metaObj.Size) {
return nil, nil
}
index = index + int64(metaObj.Size)
}
e.db.View(func(tx *bolt.Tx) error {
dataKey := e.RawListKey(key, metaObj.Head+uint64(index))
b := tx.Bucket([]byte("boltListBucket"))
item = b.Get(dataKey)
return nil
})
return item, nil
}
//LInsert insert an item before pivot
func (e *boltListEngine) LInsert(key []byte, pos int, pivot []byte, value []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
//find pivot position
var i uint64
var findPivot = false
for i = 0; i < metaObj.Size && !findPivot; i++ {
eDataKey := e.RawListKey(key, metaObj.Head+i)
var item []byte
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
item = b.Get(eDataKey)
if bytes.Compare(item, pivot) == 0 {
findPivot = true
}
return nil
})
}
//pivot not find
if i >= metaObj.Size {
return int64(metaObj.Size), nil
}
for j := metaObj.Size; j > i; j-- {
//get DataKey needed move back for one position
//move it to [current index + 1]
fromDataKey := e.RawListKey(key, metaObj.Head+j-1)
toDataKey := e.RawListKey(key, metaObj.Head+j)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
item := b.Get(fromDataKey)
err := b.Put(toDataKey, item)
return err
})
}
//insert value into index : i
eDataKey := e.RawListKey(key, metaObj.Head+i)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Put(eDataKey, value)
return err
})
//update metaKey
metaObj.Tail++
metaObj.Size++
v := MarshalListObj(metaObj)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Put(eMetaKey, v)
return err
})
return int64(metaObj.Size), nil
}
func (e *boltListEngine) LLen(key []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
return int64(metaObj.Size), nil
}
// head <----------------> tail
//
func (e *boltListEngine) LPop(key []byte) ([]byte, error) {
return e.lPopWithDirec(key, LHeadDirection)
}
func (e *boltListEngine) lPopWithDirec(key []byte, direc uint8) ([]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
//lpop item, used as result
// but in e.db.Update,this address be relocate, so return error
var item []byte
eMataKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
UpdateErr := e.db.Update(func(tx *bolt.Tx) error {
var eDataKey []byte
b := tx.Bucket([]byte("boltListBucket"))
if direc == LHeadDirection {
eDataKey = e.RawListKey(key, metaObj.Head)
metaObj.Head++
} else {
metaObj.Tail--
eDataKey = e.RawListKey(key, metaObj.Tail)
}
metaObj.Size--
if metaObj.Size == 0 {
//only one item left, delete meta
err = b.Delete(eMataKey)
if err != nil {
return err
}
} else {
v := MarshalListObj(metaObj)
err = b.Put(eMataKey, v)
if err != nil {
return err
}
}
//get item value
item = b.Get(eDataKey)
//delete item
err = b.Delete(eDataKey)
if err != nil {
return err
}
return nil
})
// add append
return append([]byte{} , item...), UpdateErr
}
func (e *boltListEngine) LPush(key []byte, values ...[]byte) (int64, error) {
return e.lPushWithDirec(LHeadDirection, key, values...)
}
func (e *boltListEngine) lPushWithDirec(direc uint8, key []byte, values ...[]byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
metaObj = e.newListMetaObj()
}
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
var index uint64
itemCnt := uint64(len(values))
if direc == LHeadDirection {
index = metaObj.Head
metaObj.Head = metaObj.Head - itemCnt
} else {
index = metaObj.Tail
metaObj.Tail = metaObj.Tail + itemCnt
}
metaObj.Size = metaObj.Size + itemCnt
// encode meta value to bytes
v := MarshalListObj(metaObj)
//update meta, put item
err = b.Put(eMetaKey, v)
if err != nil {
return err
}
var eDataKey []byte
for _, value := range values {
//generate item key
if direc == LHeadDirection {
index--
eDataKey = e.RawListKey(key, index)
} else {
eDataKey = e.RawListKey(key, index)
index++
}
err = b.Put(eDataKey, value)
if err != nil {
return err
}
}
return nil
})
return int64(metaObj.Size), nil
}
func (e *boltListEngine) LPushX(key []byte, value []byte) (int64, error) {
return e.lPushWithDirec(LHeadDirection, key, value)
}
func (e *boltListEngine) LRange(key []byte, begin int64, end int64) ([][]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
if begin > end && (end > 0 || begin < 0) {
return nil, nil
}
var (
retSlice [][]byte
err error
)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
if begin < 0 {
if begin < -int64(metaObj.Size) {
//set begin be first item index
begin = 0
} else {
begin = begin + int64(metaObj.Size)
}
} else {
if begin >= int64(metaObj.Size) {
return nil, nil
}
}
if end < 0 {
if end >= int64(metaObj.Size) {
//set stop be last item index
end = int64(metaObj.Size) - 1
}
}
if begin > end {
return nil, nil
}
keys := make([][]byte, end-begin+1)
retSlice = make([][]byte, end-begin+1)
for i := range keys {
keys[i] = e.RawListKey(key, metaObj.Head+uint64(begin)+uint64(i))
}
//use Read-only transaction of boltDB, may from Snapshot
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
for i, DataKey := range keys {
retSlice[i] = b.Get(DataKey)
}
return nil
})
return retSlice, nil
}
func (e *boltListEngine) LRem(key []byte, count int64, value []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
//collect all nodes in current list key
//delete all dataKeys
nodesCollector := make([][]byte, metaObj.Size)
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
for i := range nodesCollector {
eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
nodesCollector[i] = b.Get(eDataKey)
b.Delete(eDataKey)
}
return nil
})
//delete metaKey
e.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Delete(eMetaKey)
return err
})
//delete item as command
var deleteCount int64
retSlice := [][]byte{}
if count > 0 {
for _, node := range nodesCollector {
if deleteCount == count || bytes.Compare(node, value) != 0 {
retSlice = append(retSlice, node)
} else {
deleteCount++
}
}
} else if count < 0 {
count = - count
for i := len(nodesCollector) - 1; i >= 0; i-- {
node := nodesCollector[i]
if deleteCount == count || bytes.Compare(node, value) != 0 {
retSlice = append(retSlice, node)
} else {
deleteCount++
}
}
//reverse retSlice
util.ReverseByteMetrics(retSlice)
} else if count == 0 {
for _, node := range nodesCollector {
if bytes.Compare(node, value) != 0 {
retSlice = append(retSlice, node)
}
}
}
//recover the list into bolt
metaObj = e.newListMetaObj()
metaObj.Tail = metaObj.Tail + uint64(len(retSlice)) - 1
metaObj.Size = uint64(len(retSlice))
v := MarshalListObj(metaObj)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Put(eMetaKey, v)
return err
})
e.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
for i, node := range retSlice {
eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
err := b.Put(eDataKey, node)
if err != nil {
return err
}
}
return nil
})
return int64(metaObj.Size), nil
}
func (e *boltListEngine) LSet(key []byte, index int64, value []byte) error {
if len(key) == 0 {
return ErrEmptyKey
}
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return err
}
if metaObj == nil {
metaObj = e.newListMetaObj()
}
setErr := e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
eDataKey := e.RawListKey(key, metaObj.Head+uint64(index))
err := b.Put(eDataKey, value)
return err
})
return setErr
}
func (e *boltListEngine) LTrim(key []byte, begin int64, end int64) error {
if len(key) == 0 {
return ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return err
}
if metaObj == nil {
metaObj = e.newListMetaObj()
}
updateErr := e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
var delKey bool
if begin < 0 {
if begin < -int64(metaObj.Size) {
begin = 0
} else {
begin = begin + int64(metaObj.Size)
}
} else {
if begin >= int64(metaObj.Size) {
delKey = true
}
}
if end < 0 {
if end < -int64(metaObj.Size) {
end = 0
} else {
end = end + int64(metaObj.Size)
}
} else {
if end >= int64(metaObj.Size) {
end = int64(metaObj.Size) - 1
}
}
if begin > end {
delKey = true
}
if delKey {
// delete meta key and all items
err = b.Delete(eMetaKey)
if err != nil {
return err
}
for i := begin; i < end; i++ {
eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
err = b.Delete(eDataKey)
if err != nil {
return err
}
}
} else {
head := metaObj.Head
size := metaObj.Size
metaObj.Head = metaObj.Head + uint64(begin)
metaObj.Tail = metaObj.Head + uint64(end) + 1
metaObj.Size = metaObj.Tail - metaObj.Head
metaValue := MarshalListObj(metaObj)
//update meta
err = b.Put(eMetaKey, metaValue)
if err != nil {
return err
}
var i int64
//delete front items
for i = 0; i < begin; i++ {
eDataKey := e.RawListKey(key, head+uint64(i))
err = b.Delete(eDataKey)
if err != nil {
return err
}
}
//delete backend items
for i = begin + 1; i < int64(size)-1; i++ {
eDataKey := e.RawListKey(key, head+uint64(i))
err = b.Delete(eDataKey)
if err != nil {
return err
}
}
}
return nil
})
return updateErr
}
func (e *boltListEngine) RPop(key []byte) ([]byte, error) {
return e.lPopWithDirec(key, LTailDirection)
}
func (e *boltListEngine) RPush(key []byte, values ...[]byte) (int64, error) {
return e.lPushWithDirec(LTailDirection, key, values...)
}
func (e boltListEngine) RPushX(key []byte, value []byte) (int64, error) {
return e.lPushWithDirec(LTailDirection, key, value)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。