63 Star 183 Fork 3

Gitee 极速下载/hyperledger-fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
克隆/下载
kv_ledger.go 17.55 KB
一键复制 编辑 原始数据 按行查看 历史
Senthil Nathan N 提交于 2018-12-19 11:35 . recon: add debug logs in ledger
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package kvledger
import (
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/core/ledger/confighistory"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgerstorage"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
)
var logger = flogging.MustGetLogger("kvledger")
// KVLedger provides an implementation of `ledger.PeerLedger`.
// This implementation provides a key-value based data model
type kvLedger struct {
ledgerID string
blockStore *ledgerstorage.Store
txtmgmt txmgr.TxMgr
historyDB historydb.HistoryDB
configHistoryRetriever ledger.ConfigHistoryRetriever
blockAPIsRWLock *sync.RWMutex
stats *ledgerStats
}
// NewKVLedger constructs new `KVLedger`
func newKVLedger(
ledgerID string,
blockStore *ledgerstorage.Store,
versionedDB privacyenabledstate.DB,
historyDB historydb.HistoryDB,
configHistoryMgr confighistory.Mgr,
stateListeners []ledger.StateListener,
bookkeeperProvider bookkeeping.Provider,
ccInfoProvider ledger.DeployedChaincodeInfoProvider,
stats *ledgerStats,
) (*kvLedger, error) {
logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)
// Create a kvLedger for this chain/ledger, which encasulates the underlying
// id store, blockstore, txmgr (state database), history database
l := &kvLedger{ledgerID: ledgerID, blockStore: blockStore, historyDB: historyDB, blockAPIsRWLock: &sync.RWMutex{}}
// TODO Move the function `GetChaincodeEventListener` to ledger interface and
// this functionality of regiserting for events to ledgermgmt package so that this
// is reused across other future ledger implementations
ccEventListener := versionedDB.GetChaincodeEventListener()
logger.Debugf("Register state db for chaincode lifecycle events: %t", ccEventListener != nil)
if ccEventListener != nil {
cceventmgmt.GetMgr().Register(ledgerID, ccEventListener)
}
btlPolicy := pvtdatapolicy.ConstructBTLPolicy(&collectionInfoRetriever{l, ccInfoProvider})
if err := l.initTxMgr(versionedDB, stateListeners, btlPolicy, bookkeeperProvider, ccInfoProvider); err != nil {
return nil, err
}
l.initBlockStore(btlPolicy)
//Recover both state DB and history DB if they are out of sync with block storage
if err := l.recoverDBs(); err != nil {
panic(errors.WithMessage(err, "error during state DB recovery"))
}
l.configHistoryRetriever = configHistoryMgr.GetRetriever(ledgerID, l)
info, err := l.GetBlockchainInfo()
if err != nil {
return nil, err
}
// initialize stat with the current height
stats.updateBlockchainHeight(info.Height)
l.stats = stats
return l, nil
}
func (l *kvLedger) initTxMgr(versionedDB privacyenabledstate.DB, stateListeners []ledger.StateListener,
btlPolicy pvtdatapolicy.BTLPolicy, bookkeeperProvider bookkeeping.Provider, ccInfoProvider ledger.DeployedChaincodeInfoProvider) error {
var err error
l.txtmgmt, err = lockbasedtxmgr.NewLockBasedTxMgr(l.ledgerID, versionedDB, stateListeners, btlPolicy, bookkeeperProvider, ccInfoProvider)
return err
}
func (l *kvLedger) initBlockStore(btlPolicy pvtdatapolicy.BTLPolicy) {
l.blockStore.Init(btlPolicy)
}
//Recover the state database and history database (if exist)
//by recommitting last valid blocks
func (l *kvLedger) recoverDBs() error {
logger.Debugf("Entering recoverDB()")
if err := l.syncStateAndHistoryDBWithBlockstore(); err != nil {
return err
}
if err := l.syncStateDBWithPvtdatastore(); err != nil {
return err
}
return nil
}
func (l *kvLedger) syncStateAndHistoryDBWithBlockstore() error {
//If there is no block in blockstorage, nothing to recover.
info, _ := l.blockStore.GetBlockchainInfo()
if info.Height == 0 {
logger.Debug("Block storage is empty.")
return nil
}
lastAvailableBlockNum := info.Height - 1
recoverables := []recoverable{l.txtmgmt, l.historyDB}
recoverers := []*recoverer{}
for _, recoverable := range recoverables {
recoverFlag, firstBlockNum, err := recoverable.ShouldRecover(lastAvailableBlockNum)
if err != nil {
return err
}
if recoverFlag {
recoverers = append(recoverers, &recoverer{firstBlockNum, recoverable})
}
}
if len(recoverers) == 0 {
return nil
}
if len(recoverers) == 1 {
return l.recommitLostBlocks(recoverers[0].firstBlockNum, lastAvailableBlockNum, recoverers[0].recoverable)
}
// both dbs need to be recovered
if recoverers[0].firstBlockNum > recoverers[1].firstBlockNum {
// swap (put the lagger db at 0 index)
recoverers[0], recoverers[1] = recoverers[1], recoverers[0]
}
if recoverers[0].firstBlockNum != recoverers[1].firstBlockNum {
// bring the lagger db equal to the other db
if err := l.recommitLostBlocks(recoverers[0].firstBlockNum, recoverers[1].firstBlockNum-1,
recoverers[0].recoverable); err != nil {
return err
}
}
// get both the db upto block storage
return l.recommitLostBlocks(recoverers[1].firstBlockNum, lastAvailableBlockNum,
recoverers[0].recoverable, recoverers[1].recoverable)
}
func (l *kvLedger) syncStateDBWithPvtdatastore() error {
// TODO: So far, the design philosophy was that the scope of block storage is
// limited to storing and retrieving blocks data with certain guarantees and statedb is
// for the state management. The higher layer, 'kvledger', coordinates the acts between
// the two. However, with maintaining the state of the consumption of blocks (i.e,
// lastUpdatedOldBlockList for pvtstore reconciliation) within private data block storage
// breaks that assumption. The knowledge of what blocks have been consumed for the purpose
// of state update should not lie with the source (i.e., pvtdatastorage). A potential fix
// is mentioned in FAB-12731
blocksPvtData, err := l.blockStore.GetLastUpdatedOldBlocksPvtData()
if err != nil {
return err
}
if err := l.txtmgmt.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData); err != nil {
return err
}
if err := l.blockStore.ResetLastUpdatedOldBlocksList(); err != nil {
return err
}
return nil
}
//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
//state DB or history DB or both
func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error {
logger.Infof("Recommitting lost blocks - firstBlockNum=%d, lastBlockNum=%d, recoverables=%#v", firstBlockNum, lastBlockNum, recoverables)
var err error
var blockAndPvtdata *ledger.BlockAndPvtData
for blockNumber := firstBlockNum; blockNumber <= lastBlockNum; blockNumber++ {
if blockAndPvtdata, err = l.GetPvtDataAndBlockByNum(blockNumber, nil); err != nil {
return err
}
for _, r := range recoverables {
if err := r.CommitLostBlock(blockAndPvtdata); err != nil {
return err
}
}
}
logger.Infof("Recommitted lost blocks - firstBlockNum=%d, lastBlockNum=%d, recoverables=%#v", firstBlockNum, lastBlockNum, recoverables)
return nil
}
// GetTransactionByID retrieves a transaction by id
func (l *kvLedger) GetTransactionByID(txID string) (*peer.ProcessedTransaction, error) {
tranEnv, err := l.blockStore.RetrieveTxByID(txID)
if err != nil {
return nil, err
}
txVResult, err := l.blockStore.RetrieveTxValidationCodeByTxID(txID)
if err != nil {
return nil, err
}
processedTran := &peer.ProcessedTransaction{TransactionEnvelope: tranEnv, ValidationCode: int32(txVResult)}
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return processedTran, nil
}
// GetBlockchainInfo returns basic info about blockchain
func (l *kvLedger) GetBlockchainInfo() (*common.BlockchainInfo, error) {
bcInfo, err := l.blockStore.GetBlockchainInfo()
l.blockAPIsRWLock.RLock()
defer l.blockAPIsRWLock.RUnlock()
return bcInfo, err
}
// GetBlockByNumber returns block at a given height
// blockNumber of math.MaxUint64 will return last block
func (l *kvLedger) GetBlockByNumber(blockNumber uint64) (*common.Block, error) {
block, err := l.blockStore.RetrieveBlockByNumber(blockNumber)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return block, err
}
// GetBlocksIterator returns an iterator that starts from `startBlockNumber`(inclusive).
// The iterator is a blocking iterator i.e., it blocks till the next block gets available in the ledger
// ResultsIterator contains type BlockHolder
func (l *kvLedger) GetBlocksIterator(startBlockNumber uint64) (commonledger.ResultsIterator, error) {
blkItr, err := l.blockStore.RetrieveBlocks(startBlockNumber)
if err != nil {
return nil, err
}
return &blocksItr{l.blockAPIsRWLock, blkItr}, nil
}
// GetBlockByHash returns a block given it's hash
func (l *kvLedger) GetBlockByHash(blockHash []byte) (*common.Block, error) {
block, err := l.blockStore.RetrieveBlockByHash(blockHash)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return block, err
}
// GetBlockByTxID returns a block which contains a transaction
func (l *kvLedger) GetBlockByTxID(txID string) (*common.Block, error) {
block, err := l.blockStore.RetrieveBlockByTxID(txID)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return block, err
}
func (l *kvLedger) GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
txValidationCode, err := l.blockStore.RetrieveTxValidationCodeByTxID(txID)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return txValidationCode, err
}
//Prune prunes the blocks/transactions that satisfy the given policy
func (l *kvLedger) Prune(policy commonledger.PrunePolicy) error {
return errors.New("not yet implemented")
}
// NewTxSimulator returns new `ledger.TxSimulator`
func (l *kvLedger) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
return l.txtmgmt.NewTxSimulator(txid)
}
// NewQueryExecutor gives handle to a query executor.
// A client can obtain more than one 'QueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
func (l *kvLedger) NewQueryExecutor() (ledger.QueryExecutor, error) {
return l.txtmgmt.NewQueryExecutor(util.GenerateUUID())
}
// NewHistoryQueryExecutor gives handle to a history query executor.
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
// Pass the ledger blockstore so that historical values can be looked up from the chain
func (l *kvLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
return l.historyDB.NewHistoryQueryExecutor(l.blockStore)
}
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) error {
var err error
block := pvtdataAndBlock.Block
blockNo := pvtdataAndBlock.Block.Header.Number
startBlockProcessing := time.Now()
logger.Debugf("[%s] Validating state for block [%d]", l.ledgerID, blockNo)
txstatsInfo, err := l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)
if err != nil {
return err
}
elapsedBlockProcessing := time.Since(startBlockProcessing)
startCommitBlockStorage := time.Now()
logger.Debugf("[%s] Committing block [%d] to storage", l.ledgerID, blockNo)
l.blockAPIsRWLock.Lock()
defer l.blockAPIsRWLock.Unlock()
if err = l.blockStore.CommitWithPvtData(pvtdataAndBlock); err != nil {
return err
}
elapsedCommitBlockStorage := time.Since(startCommitBlockStorage)
startCommitState := time.Now()
logger.Debugf("[%s] Committing block [%d] transactions to state database", l.ledgerID, blockNo)
if err = l.txtmgmt.Commit(); err != nil {
panic(errors.WithMessage(err, "error during commit to txmgr"))
}
elapsedCommitState := time.Since(startCommitState)
// History database could be written in parallel with state and/or async as a future optimization,
// although it has not been a bottleneck...no need to clutter the log with elapsed duration.
if ledgerconfig.IsHistoryDBEnabled() {
logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(errors.WithMessage(err, "Error during commit to history db"))
}
}
elapsedCommitWithPvtData := time.Since(startBlockProcessing)
logger.Infof("[%s] Committed block [%d] with %d transaction(s) in %dms (state_validation=%dms block_commit=%dms state_commit=%dms)",
l.ledgerID, block.Header.Number, len(block.Data.Data),
elapsedCommitWithPvtData/time.Millisecond,
elapsedBlockProcessing/time.Millisecond,
elapsedCommitBlockStorage/time.Millisecond,
elapsedCommitState/time.Millisecond,
)
l.updateBlockStats(blockNo,
elapsedBlockProcessing,
elapsedCommitBlockStorage,
elapsedCommitState,
txstatsInfo,
)
return nil
}
func (l *kvLedger) updateBlockStats(
blockNum uint64,
blockProcessingTime time.Duration,
blockstorageCommitTime time.Duration,
statedbCommitTime time.Duration,
txstatsInfo []*txmgr.TxStatInfo,
) {
l.stats.updateBlockchainHeight(blockNum + 1)
l.stats.updateBlockProcessingTime(blockProcessingTime)
l.stats.updateBlockstorageCommitTime(blockstorageCommitTime)
l.stats.updateStatedbCommitTime(statedbCommitTime)
l.stats.updateTransactionsStats(txstatsInfo)
}
// GetMissingPvtDataInfoForMostRecentBlocks returns the missing private data information for the
// most recent `maxBlock` blocks which miss at least a private data of a eligible collection.
func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.MissingPvtDataInfo, error) {
return l.blockStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock)
}
// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.
// The pvt data is filtered by the list of 'collections' supplied
func (l *kvLedger) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) {
blockAndPvtdata, err := l.blockStore.GetPvtDataAndBlockByNum(blockNum, filter)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return blockAndPvtdata, err
}
// GetPvtDataByNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'collections' supplied
func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
pvtdata, err := l.blockStore.GetPvtDataByNum(blockNum, filter)
l.blockAPIsRWLock.RLock()
l.blockAPIsRWLock.RUnlock()
return pvtdata, err
}
// Purge removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
func (l *kvLedger) PurgePrivateData(maxBlockNumToRetain uint64) error {
return errors.New("not yet implemented")
}
// PrivateDataMinBlockNum returns the lowest retained endorsement block height
func (l *kvLedger) PrivateDataMinBlockNum() (uint64, error) {
return 0, errors.New("not yet implemented")
}
func (l *kvLedger) GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error) {
return l.configHistoryRetriever, nil
}
func (l *kvLedger) CommitPvtDataOfOldBlocks(pvtData []*ledger.BlockPvtData) ([]*ledger.PvtdataHashMismatch, error) {
logger.Debugf("[%s:] Comparing pvtData of [%d] old blocks against the hashes in transaction's rwset to find valid and invalid data",
l.ledgerID, len(pvtData))
validPvtData, hashMismatches, err := ConstructValidAndInvalidPvtData(pvtData, l.blockStore)
if err != nil {
return nil, err
}
logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(pvtData))
err = l.blockStore.CommitPvtDataOfOldBlocks(validPvtData)
if err != nil {
return nil, err
}
logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the stateDB", l.ledgerID, len(pvtData))
err = l.txtmgmt.RemoveStaleAndCommitPvtDataOfOldBlocks(validPvtData)
if err != nil {
return nil, err
}
logger.Debugf("[%s:] Clearing the bookkeeping information from pvtdatastore", l.ledgerID)
if err := l.blockStore.ResetLastUpdatedOldBlocksList(); err != nil {
return nil, err
}
return hashMismatches, nil
}
func (l *kvLedger) GetMissingPvtDataTracker() (ledger.MissingPvtDataTracker, error) {
return l, nil
}
// Close closes `KVLedger`
func (l *kvLedger) Close() {
l.blockStore.Shutdown()
l.txtmgmt.Shutdown()
}
type blocksItr struct {
blockAPIsRWLock *sync.RWMutex
blocksItr commonledger.ResultsIterator
}
func (itr *blocksItr) Next() (commonledger.QueryResult, error) {
block, err := itr.blocksItr.Next()
if err != nil {
return nil, err
}
itr.blockAPIsRWLock.RLock()
itr.blockAPIsRWLock.RUnlock()
return block, nil
}
func (itr *blocksItr) Close() {
itr.blocksItr.Close()
}
type collectionInfoRetriever struct {
ledger ledger.PeerLedger
infoProvider ledger.DeployedChaincodeInfoProvider
}
func (r *collectionInfoRetriever) CollectionInfo(chaincodeName, collectionName string) (*common.StaticCollectionConfig, error) {
qe, err := r.ledger.NewQueryExecutor()
if err != nil {
return nil, err
}
defer qe.Done()
return r.infoProvider.CollectionInfo(chaincodeName, collectionName, qe)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/hyperledger-fabric.git
git@gitee.com:mirrors/hyperledger-fabric.git
mirrors
hyperledger-fabric
hyperledger-fabric
v1.4.1

搜索帮助