4 Star 0 Fork 1

wanttobeamaster/gridbase

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
bolt_engine_list.go 13.47 KB
一键复制 编辑 原始数据 按行查看 历史
wanttobeamaster 提交于 2021-06-14 21:22 +08:00 . debug lpop
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
package storage
import (
"bytes"
bolt "gitee.com/wanttobeamaster/bbolt"
"gitee.com/wanttobeamaster/gridbase/pkg/util"
)
const (
LHeadDirection uint8 = 0
LTailDirection uint8 = 1
LItemMinIndex uint64 = 1024
LItemMaxIndex uint64 = 1<<64 - 1024
LItemInitIndex uint64 = 1<<32 - 512
LimitConcurrencyWrite = 50000
)
type ListObj struct {
Head uint64
Tail uint64
Size uint64
}
// MarshalListObj Serialize ListObj func
func MarshalListObj(obj *ListObj) []byte {
totalLen := 8 + 8 + 8
raw := make([]byte, totalLen)
idx := 0
_ = util.Uint64ToBytes1(raw[idx:], obj.Head)
idx += 8
_ = util.Uint64ToBytes1(raw[idx:], obj.Tail)
idx += 8
_ = util.Uint64ToBytes1(raw[idx:], obj.Size)
return raw
}
// UnmarshalListObj Deserialize ListObj func
func UnmarshalListObj(raw []byte) (*ListObj, error) {
if len(raw) != 24 {
return nil, nil
}
obj := ListObj{}
idx := 0
obj.Head, _ = util.BytesToUint64(raw[idx:])
idx += 8
obj.Tail, _ = util.BytesToUint64(raw[idx:])
idx += 8
obj.Size, _ = util.BytesToUint64(raw[idx:])
return &obj, nil
}
type boltListEngine struct {
db *bolt.DB
limiter util.Limiter
}
func newBoltListEngine(db *bolt.DB) ListEngine {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("boltListBucket"))
if err != nil {
return err
}
return nil
})
return &boltListEngine{
db: db,
limiter: *util.NewLimiter(LimitConcurrencyWrite),
}
}
// ListMetaObj generate *ListObj with key
func (e *boltListEngine) ListMetaObj(key []byte) (*ListObj, bool, error) {
var (
v []byte
err error
)
metaKey := RawKeyPrefix(key)
err = e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
v = b.Get(metaKey)
return nil
})
if err != nil {
return nil, false, err
}
if v == nil {
return nil, false, nil
}
obj, err := UnmarshalListObj(v)
if err != nil {
return nil, false, nil
}
return obj, false, nil
}
func(e *boltListEngine) newListMetaObj() *ListObj {
return &ListObj{
Head: LItemInitIndex,
Tail: LItemInitIndex,
Size: 0,
}
}
func (e *boltListEngine) RawListKey(key []byte, idx uint64) []byte {
keyPrefix := RawKeyPrefix(key)
listKey := append(keyPrefix, util.DataTypeKey)
idxBytes, _ := util.Uint64ToBytes(idx)
listKey = append(listKey, idxBytes...)
return listKey
}
func (e *boltListEngine) LIndex(key []byte, index int64) ([]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
var item []byte
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
//check whether index in [head, tail] of list
//index can be negative!
if index >= 0 {
if index >= int64(metaObj.Size) {
return nil, nil
}
} else {
if -index > int64(metaObj.Size) {
return nil, nil
}
index = index + int64(metaObj.Size)
}
e.db.View(func(tx *bolt.Tx) error {
dataKey := e.RawListKey(key, metaObj.Head+uint64(index))
b := tx.Bucket([]byte("boltListBucket"))
item = b.Get(dataKey)
return nil
})
return item, nil
}
//LInsert insert an item before pivot
func (e *boltListEngine) LInsert(key []byte, pos int, pivot []byte, value []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
//find pivot position
var i uint64
var findPivot = false
for i = 0; i < metaObj.Size && !findPivot; i++ {
eDataKey := e.RawListKey(key, metaObj.Head+i)
var item []byte
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
item = b.Get(eDataKey)
if bytes.Compare(item, pivot) == 0 {
findPivot = true
}
return nil
})
}
//pivot not find
if i >= metaObj.Size {
return int64(metaObj.Size), nil
}
for j := metaObj.Size; j > i; j-- {
//get DataKey needed move back for one position
//move it to [current index + 1]
fromDataKey := e.RawListKey(key, metaObj.Head+j-1)
toDataKey := e.RawListKey(key, metaObj.Head+j)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
item := b.Get(fromDataKey)
err := b.Put(toDataKey, item)
return err
})
}
//insert value into index : i
eDataKey := e.RawListKey(key, metaObj.Head+i)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Put(eDataKey, value)
return err
})
//update metaKey
metaObj.Tail++
metaObj.Size++
v := MarshalListObj(metaObj)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Put(eMetaKey, v)
return err
})
return int64(metaObj.Size), nil
}
func (e *boltListEngine) LLen(key []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
return int64(metaObj.Size), nil
}
// head <----------------> tail
//
func (e *boltListEngine) LPop(key []byte) ([]byte, error) {
return e.lPopWithDirec(key, LHeadDirection)
}
func (e *boltListEngine) lPopWithDirec(key []byte, direc uint8) ([]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
//lpop item, used as result
// but in e.db.Update,this address be relocate, so return error
var item []byte
eMataKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
UpdateErr := e.db.Update(func(tx *bolt.Tx) error {
var eDataKey []byte
b := tx.Bucket([]byte("boltListBucket"))
if direc == LHeadDirection {
eDataKey = e.RawListKey(key, metaObj.Head)
metaObj.Head++
} else {
metaObj.Tail--
eDataKey = e.RawListKey(key, metaObj.Tail)
}
metaObj.Size--
if metaObj.Size == 0 {
//only one item left, delete meta
err = b.Delete(eMataKey)
if err != nil {
return err
}
} else {
v := MarshalListObj(metaObj)
err = b.Put(eMataKey, v)
if err != nil {
return err
}
}
//get item value
item = b.Get(eDataKey)
//delete item
err = b.Delete(eDataKey)
if err != nil {
return err
}
return nil
})
// add append
return append([]byte{} , item...), UpdateErr
}
func (e *boltListEngine) LPush(key []byte, values ...[]byte) (int64, error) {
return e.lPushWithDirec(LHeadDirection, key, values...)
}
func (e *boltListEngine) lPushWithDirec(direc uint8, key []byte, values ...[]byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
metaObj = e.newListMetaObj()
}
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
var index uint64
itemCnt := uint64(len(values))
if direc == LHeadDirection {
index = metaObj.Head
metaObj.Head = metaObj.Head - itemCnt
} else {
index = metaObj.Tail
metaObj.Tail = metaObj.Tail + itemCnt
}
metaObj.Size = metaObj.Size + itemCnt
// encode meta value to bytes
v := MarshalListObj(metaObj)
//update meta, put item
err = b.Put(eMetaKey, v)
if err != nil {
return err
}
var eDataKey []byte
for _, value := range values {
//generate item key
if direc == LHeadDirection {
index--
eDataKey = e.RawListKey(key, index)
} else {
eDataKey = e.RawListKey(key, index)
index++
}
err = b.Put(eDataKey, value)
if err != nil {
return err
}
}
return nil
})
return int64(metaObj.Size), nil
}
func (e *boltListEngine) LPushX(key []byte, value []byte) (int64, error) {
return e.lPushWithDirec(LHeadDirection, key, value)
}
func (e *boltListEngine) LRange(key []byte, begin int64, end int64) ([][]byte, error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
if begin > end && (end > 0 || begin < 0) {
return nil, nil
}
var (
retSlice [][]byte
err error
)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return nil, err
}
if metaObj == nil {
return nil, nil
}
if begin < 0 {
if begin < -int64(metaObj.Size) {
//set begin be first item index
begin = 0
} else {
begin = begin + int64(metaObj.Size)
}
} else {
if begin >= int64(metaObj.Size) {
return nil, nil
}
}
if end < 0 {
if end >= int64(metaObj.Size) {
//set stop be last item index
end = int64(metaObj.Size) - 1
}
}
if begin > end {
return nil, nil
}
keys := make([][]byte, end-begin+1)
retSlice = make([][]byte, end-begin+1)
for i := range keys {
keys[i] = e.RawListKey(key, metaObj.Head+uint64(begin)+uint64(i))
}
//use Read-only transaction of boltDB, may from Snapshot
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
for i, DataKey := range keys {
retSlice[i] = b.Get(DataKey)
}
return nil
})
return retSlice, nil
}
func (e *boltListEngine) LRem(key []byte, count int64, value []byte) (int64, error) {
if len(key) == 0 {
return 0, ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return 0, err
}
if metaObj == nil {
return 0, nil
}
//collect all nodes in current list key
//delete all dataKeys
nodesCollector := make([][]byte, metaObj.Size)
e.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
for i := range nodesCollector {
eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
nodesCollector[i] = b.Get(eDataKey)
b.Delete(eDataKey)
}
return nil
})
//delete metaKey
e.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Delete(eMetaKey)
return err
})
//delete item as command
var deleteCount int64
retSlice := [][]byte{}
if count > 0 {
for _, node := range nodesCollector {
if deleteCount == count || bytes.Compare(node, value) != 0 {
retSlice = append(retSlice, node)
} else {
deleteCount++
}
}
} else if count < 0 {
count = - count
for i := len(nodesCollector) - 1; i >= 0; i-- {
node := nodesCollector[i]
if deleteCount == count || bytes.Compare(node, value) != 0 {
retSlice = append(retSlice, node)
} else {
deleteCount++
}
}
//reverse retSlice
util.ReverseByteMetrics(retSlice)
} else if count == 0 {
for _, node := range nodesCollector {
if bytes.Compare(node, value) != 0 {
retSlice = append(retSlice, node)
}
}
}
//recover the list into bolt
metaObj = e.newListMetaObj()
metaObj.Tail = metaObj.Tail + uint64(len(retSlice)) - 1
metaObj.Size = uint64(len(retSlice))
v := MarshalListObj(metaObj)
e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
err := b.Put(eMetaKey, v)
return err
})
e.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
for i, node := range retSlice {
eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
err := b.Put(eDataKey, node)
if err != nil {
return err
}
}
return nil
})
return int64(metaObj.Size), nil
}
func (e *boltListEngine) LSet(key []byte, index int64, value []byte) error {
if len(key) == 0 {
return ErrEmptyKey
}
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return err
}
if metaObj == nil {
metaObj = e.newListMetaObj()
}
setErr := e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
eDataKey := e.RawListKey(key, metaObj.Head+uint64(index))
err := b.Put(eDataKey, value)
return err
})
return setErr
}
func (e *boltListEngine) LTrim(key []byte, begin int64, end int64) error {
if len(key) == 0 {
return ErrEmptyKey
}
eMetaKey := RawKeyPrefix(key)
metaObj, _, err := e.ListMetaObj(key)
if err != nil {
return err
}
if metaObj == nil {
metaObj = e.newListMetaObj()
}
updateErr := e.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("boltListBucket"))
var delKey bool
if begin < 0 {
if begin < -int64(metaObj.Size) {
begin = 0
} else {
begin = begin + int64(metaObj.Size)
}
} else {
if begin >= int64(metaObj.Size) {
delKey = true
}
}
if end < 0 {
if end < -int64(metaObj.Size) {
end = 0
} else {
end = end + int64(metaObj.Size)
}
} else {
if end >= int64(metaObj.Size) {
end = int64(metaObj.Size) - 1
}
}
if begin > end {
delKey = true
}
if delKey {
// delete meta key and all items
err = b.Delete(eMetaKey)
if err != nil {
return err
}
for i := begin; i < end; i++ {
eDataKey := e.RawListKey(key, metaObj.Head+uint64(i))
err = b.Delete(eDataKey)
if err != nil {
return err
}
}
} else {
head := metaObj.Head
size := metaObj.Size
metaObj.Head = metaObj.Head + uint64(begin)
metaObj.Tail = metaObj.Head + uint64(end) + 1
metaObj.Size = metaObj.Tail - metaObj.Head
metaValue := MarshalListObj(metaObj)
//update meta
err = b.Put(eMetaKey, metaValue)
if err != nil {
return err
}
var i int64
//delete front items
for i = 0; i < begin; i++ {
eDataKey := e.RawListKey(key, head+uint64(i))
err = b.Delete(eDataKey)
if err != nil {
return err
}
}
//delete backend items
for i = begin + 1; i < int64(size)-1; i++ {
eDataKey := e.RawListKey(key, head+uint64(i))
err = b.Delete(eDataKey)
if err != nil {
return err
}
}
}
return nil
})
return updateErr
}
func (e *boltListEngine) RPop(key []byte) ([]byte, error) {
return e.lPopWithDirec(key, LTailDirection)
}
func (e *boltListEngine) RPush(key []byte, values ...[]byte) (int64, error) {
return e.lPushWithDirec(LTailDirection, key, values...)
}
func (e boltListEngine) RPushX(key []byte, value []byte) (int64, error) {
return e.lPushWithDirec(LTailDirection, key, value)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wanttobeamaster/gridbase.git
git@gitee.com:wanttobeamaster/gridbase.git
wanttobeamaster
gridbase
gridbase
a9a2a47d54bb

搜索帮助