1 Star 0 Fork 0

陈文甲/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
kv_ledger.go 12.56 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package kvledger
import (
"errors"
"fmt"
"sync"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/common/flogging"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/core/ledger/confighistory"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgerstorage"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)
var logger = flogging.MustGetLogger("kvledger")
// KVLedger provides an implementation of `ledger.PeerLedger`.
// This implementation provides a key-value based data model
type kvLedger struct {
ledgerID string
blockStore *ledgerstorage.Store
txtmgmt txmgr.TxMgr
historyDB historydb.HistoryDB
configHistoryRetriever ledger.ConfigHistoryRetriever
blockAPIsRWLock *sync.RWMutex
}
// NewKVLedger constructs new `KVLedger`
func newKVLedger(
ledgerID string,
blockStore *ledgerstorage.Store,
versionedDB privacyenabledstate.DB,
historyDB historydb.HistoryDB,
configHistoryMgr confighistory.Mgr,
stateListeners []ledger.StateListener,
bookkeeperProvider bookkeeping.Provider) (*kvLedger, error) {
logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)
stateListeners = append(stateListeners, configHistoryMgr)
// Create a kvLedger for this chain/ledger, which encasulates the underlying
// id store, blockstore, txmgr (state database), history database
l := &kvLedger{ledgerID: ledgerID, blockStore: blockStore, historyDB: historyDB, blockAPIsRWLock: &sync.RWMutex{}}
// TODO Move the function `GetChaincodeEventListener` to ledger interface and
// this functionality of regiserting for events to ledgermgmt package so that this
// is reused across other future ledger implementations
ccEventListener := versionedDB.GetChaincodeEventListener()
logger.Debugf("Register state db for chaincode lifecycle events: %t", ccEventListener != nil)
if ccEventListener != nil {
cceventmgmt.GetMgr().Register(ledgerID, ccEventListener)
}
btlPolicy := pvtdatapolicy.NewBTLPolicy(l)
if err := l.initTxMgr(versionedDB, stateListeners, btlPolicy, bookkeeperProvider); err != nil {
return nil, err
}
l.initBlockStore(btlPolicy)
//Recover both state DB and history DB if they are out of sync with block storage
if err := l.recoverDBs(); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
}
l.configHistoryRetriever = configHistoryMgr.GetRetriever(ledgerID, l)
return l, nil
}
func (l *kvLedger) initTxMgr(versionedDB privacyenabledstate.DB, stateListeners []ledger.StateListener,
btlPolicy pvtdatapolicy.BTLPolicy, bookkeeperProvider bookkeeping.Provider) error {
var err error
l.txtmgmt, err = lockbasedtxmgr.NewLockBasedTxMgr(l.ledgerID, versionedDB, stateListeners, btlPolicy, bookkeeperProvider)
return err
}
func (l *kvLedger) initBlockStore(btlPolicy pvtdatapolicy.BTLPolicy) {
l.blockStore.Init(btlPolicy)
}
//Recover the state database and history database (if exist)
//by recommitting last valid blocks
func (l *kvLedger) recoverDBs() error {
logger.Debugf("Entering recoverDB()")
//If there is no block in blockstorage, nothing to recover.
info, _ := l.blockStore.GetBlockchainInfo()
if info.Height == 0 {
logger.Debug("Block storage is empty.")
return nil
}
lastAvailableBlockNum := info.Height - 1
recoverables := []recoverable{l.txtmgmt, l.historyDB}
recoverers := []*recoverer{}
for _, recoverable := range recoverables {
recoverFlag, firstBlockNum, err := recoverable.ShouldRecover(lastAvailableBlockNum)
if err != nil {
return err
}
if recoverFlag {
recoverers = append(recoverers, &recoverer{firstBlockNum, recoverable})
}
}
if len(recoverers) == 0 {
return nil
}
if len(recoverers) == 1 {
return l.recommitLostBlocks(recoverers[0].firstBlockNum, lastAvailableBlockNum, recoverers[0].recoverable)
}
// both dbs need to be recovered
if recoverers[0].firstBlockNum > recoverers[1].firstBlockNum {
// swap (put the lagger db at 0 index)
recoverers[0], recoverers[1] = recoverers[1], recoverers[0]
}
if recoverers[0].firstBlockNum != recoverers[1].firstBlockNum {
// bring the lagger db equal to the other db
if err := l.recommitLostBlocks(recoverers[0].firstBlockNum, recoverers[1].firstBlockNum-1,
recoverers[0].recoverable); err != nil {
return err
}
}
// get both the db upto block storage
return l.recommitLostBlocks(recoverers[1].firstBlockNum, lastAvailableBlockNum,
recoverers[0].recoverable, recoverers[1].recoverable)
}
//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
//state DB or history DB or both
func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error {
var err error
var blockAndPvtdata *ledger.BlockAndPvtData
for blockNumber := firstBlockNum; blockNumber <= lastBlockNum; blockNumber++ {
if blockAndPvtdata, err = l.GetPvtDataAndBlockByNum(blockNumber, nil); err != nil {
return err
}
for _, r := range recoverables {
if err := r.CommitLostBlock(blockAndPvtdata); err != nil {
return err
}
}
}
return nil
}
// GetTransactionByID retrieves a transaction by id
func (l *kvLedger) GetTransactionByID(txID string) (*peer.ProcessedTransaction, error) {
tranEnv, err := l.blockStore.RetrieveTxByID(txID)
if err != nil {
return nil, err
}
txVResult, err := l.blockStore.RetrieveTxValidationCodeByTxID(txID)
if err != nil {
return nil, err
}
processedTran := &peer.ProcessedTransaction{TransactionEnvelope: tranEnv, ValidationCode: int32(txVResult)}
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return processedTran, nil
}
// GetBlockchainInfo returns basic info about blockchain
func (l *kvLedger) GetBlockchainInfo() (*common.BlockchainInfo, error) {
bcInfo, err := l.blockStore.GetBlockchainInfo()
l.blockAPIsRWLock.RLock()
defer l.blockAPIsRWLock.RUnlock()
return bcInfo, err
}
// GetBlockByNumber returns block at a given height
// blockNumber of math.MaxUint64 will return last block
func (l *kvLedger) GetBlockByNumber(blockNumber uint64) (*common.Block, error) {
block, err := l.blockStore.RetrieveBlockByNumber(blockNumber)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return block, err
}
// GetBlocksIterator returns an iterator that starts from `startBlockNumber`(inclusive).
// The iterator is a blocking iterator i.e., it blocks till the next block gets available in the ledger
// ResultsIterator contains type BlockHolder
func (l *kvLedger) GetBlocksIterator(startBlockNumber uint64) (commonledger.ResultsIterator, error) {
blkItr, err := l.blockStore.RetrieveBlocks(startBlockNumber)
if err != nil {
return nil, err
}
return &blocksItr{l.blockAPIsRWLock, blkItr}, nil
}
// GetBlockByHash returns a block given it's hash
func (l *kvLedger) GetBlockByHash(blockHash []byte) (*common.Block, error) {
block, err := l.blockStore.RetrieveBlockByHash(blockHash)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return block, err
}
// GetBlockByTxID returns a block which contains a transaction
func (l *kvLedger) GetBlockByTxID(txID string) (*common.Block, error) {
block, err := l.blockStore.RetrieveBlockByTxID(txID)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return block, err
}
func (l *kvLedger) GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
txValidationCode, err := l.blockStore.RetrieveTxValidationCodeByTxID(txID)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return txValidationCode, err
}
//Prune prunes the blocks/transactions that satisfy the given policy
func (l *kvLedger) Prune(policy commonledger.PrunePolicy) error {
return errors.New("Not yet implemented")
}
// NewTxSimulator returns new `ledger.TxSimulator`
func (l *kvLedger) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
return l.txtmgmt.NewTxSimulator(txid)
}
// NewQueryExecutor gives handle to a query executor.
// A client can obtain more than one 'QueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
func (l *kvLedger) NewQueryExecutor() (ledger.QueryExecutor, error) {
return l.txtmgmt.NewQueryExecutor(util.GenerateUUID())
}
// NewHistoryQueryExecutor gives handle to a history query executor.
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
// Pass the ledger blockstore so that historical values can be looked up from the chain
func (l *kvLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
return l.historyDB.NewHistoryQueryExecutor(l.blockStore)
}
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) error {
var err error
block := pvtdataAndBlock.Block
blockNo := pvtdataAndBlock.Block.Header.Number
logger.Debugf("Channel [%s]: Validating state for block [%d]", l.ledgerID, blockNo)
err = l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)
if err != nil {
return err
}
logger.Debugf("Channel [%s]: Committing block [%d] to storage", l.ledgerID, blockNo)
l.blockAPIsRWLock.Lock()
defer l.blockAPIsRWLock.Unlock()
if err = l.blockStore.CommitWithPvtData(pvtdataAndBlock); err != nil {
return err
}
logger.Infof("Channel [%s]: Committed block [%d] with %d transaction(s)", l.ledgerID, block.Header.Number, len(block.Data.Data))
logger.Debugf("Channel [%s]: Committing block [%d] transactions to state database", l.ledgerID, blockNo)
if err = l.txtmgmt.Commit(); err != nil {
panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
}
// History database could be written in parallel with state and/or async as a future optimization
if ledgerconfig.IsHistoryDBEnabled() {
logger.Debugf("Channel [%s]: Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(fmt.Errorf(`Error during commit to history db:%s`, err))
}
}
return nil
}
// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.
// The pvt data is filtered by the list of 'collections' supplied
func (l *kvLedger) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) {
blockAndPvtdata, err := l.blockStore.GetPvtDataAndBlockByNum(blockNum, filter)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return blockAndPvtdata, err
}
// GetPvtDataByNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'collections' supplied
func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
pvtdata, err := l.blockStore.GetPvtDataByNum(blockNum, filter)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return pvtdata, err
}
// Purge removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
func (l *kvLedger) PurgePrivateData(maxBlockNumToRetain uint64) error {
return fmt.Errorf("not yet implemented")
}
// PrivateDataMinBlockNum returns the lowest retained endorsement block height
func (l *kvLedger) PrivateDataMinBlockNum() (uint64, error) {
return 0, fmt.Errorf("not yet implemented")
}
func (l *kvLedger) GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error) {
return l.configHistoryRetriever, nil
}
// Close closes `KVLedger`
func (l *kvLedger) Close() {
l.blockStore.Shutdown()
l.txtmgmt.Shutdown()
}
type blocksItr struct {
blockAPIsRWLock *sync.RWMutex
blocksItr commonledger.ResultsIterator
}
func (itr *blocksItr) Next() (commonledger.QueryResult, error) {
block, err := itr.blocksItr.Next()
if err != nil {
return nil, err
}
itr.blockAPIsRWLock.RLock()
itr.blockAPIsRWLock.RUnlock()
return block, nil
}
func (itr *blocksItr) Close() {
itr.blocksItr.Close()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/venjia/fabric.git
git@gitee.com:venjia/fabric.git
venjia
fabric
fabric
v1.2.0

搜索帮助