6 Star 44 Fork 25

Hyperledger / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
state_based_validator.go 9.18 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statebasedval
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/protos/peer"
putils "github.com/hyperledger/fabric/protos/utils"
)
var logger = flogging.MustGetLogger("statevalidator")
// Validator validates a tx against the latest committed state
// and preceding valid transactions with in the same block
type Validator struct {
db statedb.VersionedDB
}
// NewValidator constructs StateValidator
func NewValidator(db statedb.VersionedDB) *Validator {
return &Validator{db}
}
//validate endorser transaction
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return nil, peer.TxValidationCode_NIL_TXACTION, nil
}
//preparation for extracting RWSet from transaction
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action
// and then Unmarshal it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
}
txResult := peer.TxValidationCode_VALID
//mvccvalidation, may invalidate transaction
if doMVCCValidation {
if txResult, err = v.validateTx(txRWSet, updates); err != nil {
return nil, txResult, err
} else if txResult != peer.TxValidationCode_VALID {
txRWSet = nil
}
}
return txRWSet, txResult, err
}
// ValidateAndPrepareBatch implements method in Validator interface
func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) {
logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation)
updates := statedb.NewUpdateBatch()
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
// Committer validator has already set validation flags based on well formed tran checks
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Precaution in case committer validator has not added validation flags yet
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex, envBytes := range block.Data.Data {
if txsFilter.IsInvalid(txIndex) {
// Skiping invalid transaction
logger.Warningf("Block [%d] Transaction index [%d] marked as invalid by committer. Reason code [%d]",
block.Header.Number, txIndex, txsFilter.Flag(txIndex))
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return nil, err
}
payload, err := putils.GetPayload(env)
if err != nil {
return nil, err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
txType := common.HeaderType(chdr.Type)
if txType != common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Skipping mvcc validation for Block [%d] Transaction index [%d] because, the transaction type is [%s]",
block.Header.Number, txIndex, txType)
continue
}
txRWSet, txResult, err := v.validateEndorserTX(envBytes, doMVCCValidation, updates)
if err != nil {
return nil, err
}
txsFilter.SetFlag(txIndex, txResult)
//txRWSet != nil => t is valid
if txRWSet != nil {
committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex))
addWriteSetToBatch(txRWSet, committingTxHeight, updates)
txsFilter.SetFlag(txIndex, peer.TxValidationCode_VALID)
}
if txsFilter.IsValid(txIndex) {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator",
block.Header.Number, txIndex, chdr.TxId)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%d]",
block.Header.Number, txIndex, chdr.TxId, txsFilter.Flag(txIndex))
}
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
return updates, nil
}
func addWriteSetToBatch(txRWSet *rwsetutil.TxRwSet, txHeight *version.Height, batch *statedb.UpdateBatch) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
if kvWrite.IsDelete {
batch.Delete(ns, kvWrite.Key, txHeight)
} else {
batch.Put(ns, kvWrite.Key, kvWrite.Value, txHeight)
}
}
}
}
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
}
if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
}
}
return peer.TxValidationCode_VALID, nil
}
func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
for _, kvRead := range kvReads {
if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
// validateKVRead performs mvcc check for a key read during transaction simulation.
// i.e., it checks whether a key/version combination is already updated in the statedb (by an already committed block)
// or in the updates (by a preceding valid transaction in the current block)
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
versionedValue, err := v.db.GetState(ns, kvRead.Key)
if err != nil {
return false, err
}
var committedVersion *version.Height
if versionedValue != nil {
committedVersion = versionedValue.Version
}
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
func (v *Validator) validateRangeQueries(ns string, rangeQueriesInfo []*kvrwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) {
for _, rqi := range rangeQueriesInfo {
if valid, err := v.validateRangeQuery(ns, rqi, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
// validateRangeQuery performs a phatom read check i.e., it
// checks whether the results of the range query are still the same when executed on the
// statedb (latest state as of last committed block) + updates (prepared by the writes of preceding valid transactions
// in the current block and yet to be committed as part of group commit at the end of the validation of the block)
func (v *Validator) validateRangeQuery(ns string, rangeQueryInfo *kvrwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) {
logger.Debugf("validateRangeQuery: ns=%s, rangeQueryInfo=%s", ns, rangeQueryInfo)
// If during simulation, the caller had not exhausted the iterator so
// rangeQueryInfo.EndKey is not actual endKey given by the caller in the range query
// but rather it is the last key seen by the caller and hence the combinedItr should include the endKey in the results.
includeEndKey := !rangeQueryInfo.ItrExhausted
combinedItr, err := newCombinedIterator(v.db, updates,
ns, rangeQueryInfo.StartKey, rangeQueryInfo.EndKey, includeEndKey)
if err != nil {
return false, err
}
defer combinedItr.Close()
var validator rangeQueryValidator
if rangeQueryInfo.GetReadsMerkleHashes() != nil {
logger.Debug(`Hashing results are present in the range query info hence, initiating hashing based validation`)
validator = &rangeQueryHashValidator{}
} else {
logger.Debug(`Hashing results are not present in the range query info hence, initiating raw KVReads based validation`)
validator = &rangeQueryResultsValidator{}
}
validator.init(rangeQueryInfo, combinedItr)
return validator.validate()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.0.6

搜索帮助

344bd9b3 5694891 D2dac590 5694891