代码拉取完成,页面将自动刷新
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statebasedval
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/protos/peer"
putils "github.com/hyperledger/fabric/protos/utils"
)
var logger = flogging.MustGetLogger("statevalidator")
// Validator validates a tx against the latest committed state
// and preceding valid transactions with in the same block
type Validator struct {
db statedb.VersionedDB
}
// NewValidator constructs StateValidator
func NewValidator(db statedb.VersionedDB) *Validator {
return &Validator{db}
}
//validate endorser transaction
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return nil, peer.TxValidationCode_NIL_TXACTION, nil
}
//preparation for extracting RWSet from transaction
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action
// and then Unmarshal it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
}
txResult := peer.TxValidationCode_VALID
//mvccvalidation, may invalidate transaction
if doMVCCValidation {
if txResult, err = v.validateTx(txRWSet, updates); err != nil {
return nil, txResult, err
} else if txResult != peer.TxValidationCode_VALID {
txRWSet = nil
}
}
return txRWSet, txResult, err
}
// ValidateAndPrepareBatch implements method in Validator interface
func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) {
logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation)
updates := statedb.NewUpdateBatch()
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
// Committer validator has already set validation flags based on well formed tran checks
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Precaution in case committer validator has not added validation flags yet
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex, envBytes := range block.Data.Data {
if txsFilter.IsInvalid(txIndex) {
// Skiping invalid transaction
logger.Warningf("Block [%d] Transaction index [%d] marked as invalid by committer. Reason code [%d]",
block.Header.Number, txIndex, txsFilter.Flag(txIndex))
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return nil, err
}
payload, err := putils.GetPayload(env)
if err != nil {
return nil, err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
txType := common.HeaderType(chdr.Type)
if txType != common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Skipping mvcc validation for Block [%d] Transaction index [%d] because, the transaction type is [%s]",
block.Header.Number, txIndex, txType)
continue
}
txRWSet, txResult, err := v.validateEndorserTX(envBytes, doMVCCValidation, updates)
if err != nil {
return nil, err
}
txsFilter.SetFlag(txIndex, txResult)
//txRWSet != nil => t is valid
if txRWSet != nil {
committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex))
addWriteSetToBatch(txRWSet, committingTxHeight, updates)
txsFilter.SetFlag(txIndex, peer.TxValidationCode_VALID)
}
if txsFilter.IsValid(txIndex) {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator",
block.Header.Number, txIndex, chdr.TxId)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%d]",
block.Header.Number, txIndex, chdr.TxId, txsFilter.Flag(txIndex))
}
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
return updates, nil
}
func addWriteSetToBatch(txRWSet *rwsetutil.TxRwSet, txHeight *version.Height, batch *statedb.UpdateBatch) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
if kvWrite.IsDelete {
batch.Delete(ns, kvWrite.Key, txHeight)
} else {
batch.Put(ns, kvWrite.Key, kvWrite.Value, txHeight)
}
}
}
}
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
}
if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
}
}
return peer.TxValidationCode_VALID, nil
}
func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
for _, kvRead := range kvReads {
if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
// validateKVRead performs mvcc check for a key read during transaction simulation.
// i.e., it checks whether a key/version combination is already updated in the statedb (by an already committed block)
// or in the updates (by a preceding valid transaction in the current block)
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
versionedValue, err := v.db.GetState(ns, kvRead.Key)
if err != nil {
return false, nil
}
var committedVersion *version.Height
if versionedValue != nil {
committedVersion = versionedValue.Version
}
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
func (v *Validator) validateRangeQueries(ns string, rangeQueriesInfo []*kvrwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) {
for _, rqi := range rangeQueriesInfo {
if valid, err := v.validateRangeQuery(ns, rqi, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
// validateRangeQuery performs a phatom read check i.e., it
// checks whether the results of the range query are still the same when executed on the
// statedb (latest state as of last committed block) + updates (prepared by the writes of preceding valid transactions
// in the current block and yet to be committed as part of group commit at the end of the validation of the block)
func (v *Validator) validateRangeQuery(ns string, rangeQueryInfo *kvrwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) {
logger.Debugf("validateRangeQuery: ns=%s, rangeQueryInfo=%s", ns, rangeQueryInfo)
// If during simulation, the caller had not exhausted the iterator so
// rangeQueryInfo.EndKey is not actual endKey given by the caller in the range query
// but rather it is the last key seen by the caller and hence the combinedItr should include the endKey in the results.
includeEndKey := !rangeQueryInfo.ItrExhausted
combinedItr, err := newCombinedIterator(v.db, updates,
ns, rangeQueryInfo.StartKey, rangeQueryInfo.EndKey, includeEndKey)
if err != nil {
return false, err
}
defer combinedItr.Close()
var validator rangeQueryValidator
if rangeQueryInfo.GetReadsMerkleHashes() != nil {
logger.Debug(`Hashing results are present in the range query info hence, initiating hashing based validation`)
validator = &rangeQueryHashValidator{}
} else {
logger.Debug(`Hashing results are not present in the range query info hence, initiating raw KVReads based validation`)
validator = &rangeQueryResultsValidator{}
}
validator.init(rangeQueryInfo, combinedItr)
return validator.validate()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。