6 Star 47 Fork 28

Hyperledger/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
stateleveldb.go 6.47 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package stateleveldb
import (
"bytes"
"errors"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
var logger = flogging.MustGetLogger("stateleveldb")
var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}
// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
dbProvider *leveldbhelper.Provider
}
// NewVersionedDBProvider instantiates VersionedDBProvider
func NewVersionedDBProvider() *VersionedDBProvider {
dbPath := ledgerconfig.GetStateLevelDBPath()
logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath)
dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})
return &VersionedDBProvider{dbProvider}
}
// GetDBHandle gets the handle to a named database
func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) {
return newVersionedDB(provider.dbProvider.GetDBHandle(dbName), dbName), nil
}
// Close closes the underlying db
func (provider *VersionedDBProvider) Close() {
provider.dbProvider.Close()
}
// VersionedDB implements VersionedDB interface
type versionedDB struct {
db *leveldbhelper.DBHandle
dbName string
}
// newVersionedDB constructs an instance of VersionedDB
func newVersionedDB(db *leveldbhelper.DBHandle, dbName string) *versionedDB {
return &versionedDB{db, dbName}
}
// Open implements method in VersionedDB interface
func (vdb *versionedDB) Open() error {
// do nothing because shared db is used
return nil
}
// Close implements method in VersionedDB interface
func (vdb *versionedDB) Close() {
// do nothing because shared db is used
}
// ValidateKey implements method in VersionedDB interface
func (vdb *versionedDB) ValidateKey(key string) error {
return nil
}
// BytesKeySuppoted implements method in VersionedDB interface
func (vdb *versionedDB) BytesKeySuppoted() bool {
return true
}
// GetState implements method in VersionedDB interface
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
compositeKey := constructCompositeKey(namespace, key)
dbVal, err := vdb.db.Get(compositeKey)
if err != nil {
return nil, err
}
if dbVal == nil {
return nil, nil
}
val, ver := statedb.DecodeValue(dbVal)
return &statedb.VersionedValue{Value: val, Version: ver}, nil
}
// GetVersion implements method in VersionedDB interface
func (vdb *versionedDB) GetVersion(namespace string, key string) (*version.Height, error) {
versionedValue, err := vdb.GetState(namespace, key)
if err != nil {
return nil, err
}
if versionedValue == nil {
return nil, nil
}
return versionedValue.Version, nil
}
// GetStateMultipleKeys implements method in VersionedDB interface
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
vals := make([]*statedb.VersionedValue, len(keys))
for i, key := range keys {
val, err := vdb.GetState(namespace, key)
if err != nil {
return nil, err
}
vals[i] = val
}
return vals, nil
}
// GetStateRangeScanIterator implements method in VersionedDB interface
// startKey is inclusive
// endKey is exclusive
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
compositeStartKey := constructCompositeKey(namespace, startKey)
compositeEndKey := constructCompositeKey(namespace, endKey)
if endKey == "" {
compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
}
dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
return newKVScanner(namespace, dbItr), nil
}
// ExecuteQuery implements method in VersionedDB interface
func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
return nil, errors.New("ExecuteQuery not supported for leveldb")
}
// ApplyUpdates implements method in VersionedDB interface
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
dbBatch := leveldbhelper.NewUpdateBatch()
namespaces := batch.GetUpdatedNamespaces()
for _, ns := range namespaces {
updates := batch.GetUpdates(ns)
for k, vv := range updates {
compositeKey := constructCompositeKey(ns, k)
logger.Debugf("Channel [%s]: Applying key(string)=[%s] key(bytes)=[%#v]", vdb.dbName, string(compositeKey), compositeKey)
if vv.Value == nil {
dbBatch.Delete(compositeKey)
} else {
dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
}
}
}
dbBatch.Put(savePointKey, height.ToBytes())
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
return err
}
return nil
}
// GetLatestSavePoint implements method in VersionedDB interface
func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
versionBytes, err := vdb.db.Get(savePointKey)
if err != nil {
return nil, err
}
if versionBytes == nil {
return nil, nil
}
version, _ := version.NewHeightFromBytes(versionBytes)
return version, nil
}
func constructCompositeKey(ns string, key string) []byte {
return append(append([]byte(ns), compositeKeySep...), []byte(key)...)
}
func splitCompositeKey(compositeKey []byte) (string, string) {
split := bytes.SplitN(compositeKey, compositeKeySep, 2)
return string(split[0]), string(split[1])
}
type kvScanner struct {
namespace string
dbItr iterator.Iterator
}
func newKVScanner(namespace string, dbItr iterator.Iterator) *kvScanner {
return &kvScanner{namespace, dbItr}
}
func (scanner *kvScanner) Next() (statedb.QueryResult, error) {
if !scanner.dbItr.Next() {
return nil, nil
}
dbKey := scanner.dbItr.Key()
dbVal := scanner.dbItr.Value()
dbValCopy := make([]byte, len(dbVal))
copy(dbValCopy, dbVal)
_, key := splitCompositeKey(dbKey)
value, version := statedb.DecodeValue(dbValCopy)
return &statedb.VersionedKV{
CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
VersionedValue: statedb.VersionedValue{Value: value, Version: version}}, nil
}
func (scanner *kvScanner) Close() {
scanner.dbItr.Release()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.1.0-preview

搜索帮助