63 Star 181 Fork 3

Gitee 极速下载 / hyperledger-fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
克隆/下载
lockbased_txmgr.go 5.90 KB
一键复制 编辑 原始数据 按行查看 历史
yacovm 提交于 2018-02-08 15:22 . [FAB-8132] Fix typo in lockbased_txmgr.go
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package lockbasedtxmgr
import (
"sync"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/common"
)
var logger = flogging.MustGetLogger("lockbasedtxmgr")
// LockBasedTxMgr a simple implementation of interface `txmgmt.TxMgr`.
// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing
type LockBasedTxMgr struct {
ledgerid string
db privacyenabledstate.DB
validator validator.Validator
batch *privacyenabledstate.UpdateBatch
currentBlock *common.Block
stateListeners ledger.StateListeners
commitRWLock sync.RWMutex
}
// NewLockBasedTxMgr constructs a new instance of NewLockBasedTxMgr
func NewLockBasedTxMgr(ledgerid string, db privacyenabledstate.DB, stateListeners ledger.StateListeners) *LockBasedTxMgr {
db.Open()
txmgr := &LockBasedTxMgr{ledgerid: ledgerid, db: db, stateListeners: stateListeners}
txmgr.validator = valimpl.NewStatebasedValidator(txmgr, db)
return txmgr
}
// GetLastSavepoint returns the block num recorded in savepoint,
// returns 0 if NO savepoint is found
func (txmgr *LockBasedTxMgr) GetLastSavepoint() (*version.Height, error) {
return txmgr.db.GetLatestSavePoint()
}
// NewQueryExecutor implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewQueryExecutor(txid string) (ledger.QueryExecutor, error) {
qe := newQueryExecutor(txmgr, txid)
txmgr.commitRWLock.RLock()
return qe, nil
}
// NewTxSimulator implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
logger.Debugf("constructing new tx simulator")
s, err := newLockBasedTxSimulator(txmgr, txid)
if err != nil {
return nil, err
}
txmgr.commitRWLock.RLock()
return s, nil
}
// ValidateAndPrepare implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) error {
block := blockAndPvtdata.Block
logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data))
batch, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
if err != nil {
txmgr.clearCache()
return err
}
txmgr.currentBlock = block
txmgr.batch = batch
return txmgr.invokeNamespaceListeners(batch)
}
func (txmgr *LockBasedTxMgr) invokeNamespaceListeners(batch *privacyenabledstate.UpdateBatch) error {
namespaces := batch.PubUpdates.GetUpdatedNamespaces()
for _, namespace := range namespaces {
listener := txmgr.stateListeners[namespace]
if listener == nil {
continue
}
logger.Debugf("Invoking listener for state changes over namespace:%s", namespace)
updatesMap := batch.PubUpdates.GetUpdates(namespace)
var kvwrites []*kvrwset.KVWrite
for key, versionedValue := range updatesMap {
kvwrites = append(kvwrites, &kvrwset.KVWrite{Key: key, IsDelete: versionedValue.Value == nil, Value: versionedValue.Value})
}
if err := listener.HandleStateUpdates(txmgr.ledgerid, kvwrites); err != nil {
return err
}
}
return nil
}
// Shutdown implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Shutdown() {
txmgr.db.Close()
}
// Commit implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Commit() error {
// If statedb implementation needed bulk read optimization, cache might have been populated by
// ValidateAndPrepare(). Once the block is validated and committed, populated cache needs to
// be cleared.
defer txmgr.clearCache()
logger.Debugf("Committing updates to state database")
txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
logger.Debugf("Write lock acquired for committing updates to state database")
if txmgr.batch == nil {
panic("validateAndPrepare() method should have been called before calling commit()")
}
defer func() { txmgr.batch = nil }()
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.batch,
version.NewHeight(txmgr.currentBlock.Header.Number, uint64(len(txmgr.currentBlock.Data.Data)-1))); err != nil {
return err
}
logger.Debugf("Updates committed to state database")
return nil
}
// Rollback implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Rollback() {
txmgr.batch = nil
// If statedb implementation needed bulk read optimization, cache might have been populated by
// ValidateAndPrepareBatch(). As the block commit is rollbacked, populated cache needs to
// be cleared now.
txmgr.clearCache()
}
// clearCache empty the cache maintained by the statedb implementation
func (txmgr *LockBasedTxMgr) clearCache() {
if txmgr.db.IsBulkOptimizable() {
txmgr.db.ClearCachedVersions()
}
}
// ShouldRecover implements method in interface kvledger.Recoverer
func (txmgr *LockBasedTxMgr) ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error) {
savepoint, err := txmgr.GetLastSavepoint()
if err != nil {
return false, 0, err
}
if savepoint == nil {
return true, 0, nil
}
return savepoint.BlockNum != lastAvailableBlock, savepoint.BlockNum + 1, nil
}
// CommitLostBlock implements method in interface kvledger.Recoverer
func (txmgr *LockBasedTxMgr) CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvtData) error {
block := blockAndPvtdata.Block
logger.Debugf("Constructing updateSet for the block %d", block.Header.Number)
if err := txmgr.ValidateAndPrepare(blockAndPvtdata, false); err != nil {
return err
}
logger.Debugf("Committing block %d to state database", block.Header.Number)
return txmgr.Commit()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/hyperledger-fabric.git
git@gitee.com:mirrors/hyperledger-fabric.git
mirrors
hyperledger-fabric
hyperledger-fabric
v1.1.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891