1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bolt_store.go 4.80 KB
一键复制 编辑 原始数据 按行查看 历史
package raftboltdb
import (
"errors"
"github.com/boltdb/bolt"
"github.com/hashicorp/raft"
)
const (
// Permissions to use on the db file. This is only used if the
// database file does not exist and needs to be created.
dbFileMode = 0600
)
var (
// Bucket names we perform transactions in
dbLogs = []byte("logs")
dbConf = []byte("conf")
// An error indicating a given key does not exist
ErrKeyNotFound = errors.New("not found")
)
// BoltStore provides access to BoltDB for Raft to store and retrieve
// log entries. It also provides key/value storage, and can be used as
// a LogStore and StableStore.
type BoltStore struct {
// conn is the underlying handle to the db.
conn *bolt.DB
// The path to the Bolt database file
path string
}
// NewBoltStore takes a file path and returns a connected Raft backend.
func NewBoltStore(path string) (*BoltStore, error) {
// Try to connect
handle, err := bolt.Open(path, dbFileMode, nil)
if err != nil {
return nil, err
}
// Create the new store
store := &BoltStore{
conn: handle,
path: path,
}
// Set up our buckets
if err := store.initialize(); err != nil {
store.Close()
return nil, err
}
return store, nil
}
// initialize is used to set up all of the buckets.
func (b *BoltStore) initialize() error {
tx, err := b.conn.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// Create all the buckets
if _, err := tx.CreateBucketIfNotExists(dbLogs); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil {
return err
}
return tx.Commit()
}
// Close is used to gracefully close the DB connection.
func (b *BoltStore) Close() error {
return b.conn.Close()
}
// FirstIndex returns the first known index from the Raft log.
func (b *BoltStore) FirstIndex() (uint64, error) {
tx, err := b.conn.Begin(false)
if err != nil {
return 0, err
}
defer tx.Rollback()
curs := tx.Bucket(dbLogs).Cursor()
if first, _ := curs.First(); first == nil {
return 0, nil
} else {
return bytesToUint64(first), nil
}
}
// LastIndex returns the last known index from the Raft log.
func (b *BoltStore) LastIndex() (uint64, error) {
tx, err := b.conn.Begin(false)
if err != nil {
return 0, err
}
defer tx.Rollback()
curs := tx.Bucket(dbLogs).Cursor()
if last, _ := curs.Last(); last == nil {
return 0, nil
} else {
return bytesToUint64(last), nil
}
}
// GetLog is used to retrieve a log from BoltDB at a given index.
func (b *BoltStore) GetLog(idx uint64, log *raft.Log) error {
tx, err := b.conn.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()
bucket := tx.Bucket(dbLogs)
val := bucket.Get(uint64ToBytes(idx))
if val == nil {
return raft.ErrLogNotFound
}
return decodeMsgPack(val, log)
}
// StoreLog is used to store a single raft log
func (b *BoltStore) StoreLog(log *raft.Log) error {
return b.StoreLogs([]*raft.Log{log})
}
// StoreLogs is used to store a set of raft logs
func (b *BoltStore) StoreLogs(logs []*raft.Log) error {
tx, err := b.conn.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
for _, log := range logs {
key := uint64ToBytes(log.Index)
val, err := encodeMsgPack(log)
if err != nil {
return err
}
bucket := tx.Bucket(dbLogs)
if err := bucket.Put(key, val.Bytes()); err != nil {
return err
}
}
return tx.Commit()
}
// DeleteRange is used to delete logs within a given range inclusively.
func (b *BoltStore) DeleteRange(min, max uint64) error {
minKey := uint64ToBytes(min)
tx, err := b.conn.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
curs := tx.Bucket(dbLogs).Cursor()
for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() {
// Handle out-of-range log index
if bytesToUint64(k) > max {
break
}
// Delete in-range log index
if err := curs.Delete(); err != nil {
return err
}
}
return tx.Commit()
}
// Set is used to set a key/value set outside of the raft log
func (b *BoltStore) Set(k, v []byte) error {
tx, err := b.conn.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
bucket := tx.Bucket(dbConf)
if err := bucket.Put(k, v); err != nil {
return err
}
return tx.Commit()
}
// Get is used to retrieve a value from the k/v store by key
func (b *BoltStore) Get(k []byte) ([]byte, error) {
tx, err := b.conn.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()
bucket := tx.Bucket(dbConf)
val := bucket.Get(k)
if val == nil {
return nil, ErrKeyNotFound
}
return append([]byte{}, val...), nil
}
// SetUint64 is like Set, but handles uint64 values
func (b *BoltStore) SetUint64(key []byte, val uint64) error {
return b.Set(key, uint64ToBytes(val))
}
// GetUint64 is like Get, but handles uint64 values
func (b *BoltStore) GetUint64(key []byte) (uint64, error) {
val, err := b.Get(key)
if err != nil {
return 0, err
}
return bytesToUint64(val), nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.2.5-beta.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891