3 Star 13 Fork 4

赵春 / fabric-gm

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
validator.go 17.82 KB
一键复制 编辑 原始数据 按行查看 历史
赵春 提交于 2022-02-22 14:31 . 基础版本作成
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
/*
Copyright IBM Corp. 2016 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package txvalidator
import (
"context"
"time"
"gitee.com/zhaochuninhefei/fabric-gm/bccsp"
"gitee.com/zhaochuninhefei/fabric-gm/common/channelconfig"
"gitee.com/zhaochuninhefei/fabric-gm/common/configtx"
commonerrors "gitee.com/zhaochuninhefei/fabric-gm/common/errors"
"gitee.com/zhaochuninhefei/fabric-gm/common/flogging"
"gitee.com/zhaochuninhefei/fabric-gm/common/policies"
"gitee.com/zhaochuninhefei/fabric-gm/core/committer/txvalidator/plugin"
"gitee.com/zhaochuninhefei/fabric-gm/core/committer/txvalidator/v20/plugindispatcher"
"gitee.com/zhaochuninhefei/fabric-gm/core/common/validation"
"gitee.com/zhaochuninhefei/fabric-gm/core/ledger"
"gitee.com/zhaochuninhefei/fabric-gm/internal/pkg/txflags"
"gitee.com/zhaochuninhefei/fabric-gm/msp"
"gitee.com/zhaochuninhefei/fabric-gm/protoutil"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
mspprotos "github.com/hyperledger/fabric-protos-go/msp"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
)
// Semaphore provides to the validator means for synchronisation
type Semaphore interface {
// Acquire implements semaphore-like acquire semantics
Acquire(ctx context.Context) error
// Release implements semaphore-like release semantics
Release()
}
// ChannelResources provides access to channel artefacts or
// functions to interact with them
type ChannelResources interface {
// MSPManager returns the MSP manager for this channel
MSPManager() msp.MSPManager
// Apply attempts to apply a configtx to become the new config
Apply(configtx *common.ConfigEnvelope) error
// GetMSPIDs returns the IDs for the application MSPs
// that have been defined in the channel
GetMSPIDs() []string
// Capabilities defines the capabilities for the application portion of this channel
Capabilities() channelconfig.ApplicationCapabilities
}
// LedgerResources provides access to ledger artefacts or
// functions to interact with them
type LedgerResources interface {
// GetTransactionByID retrieves a transaction by id
GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
// NewQueryExecutor gives handle to a query executor.
// A client can obtain more than one 'QueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
NewQueryExecutor() (ledger.QueryExecutor, error)
}
// Dispatcher is an interface to decouple tx validator
// and plugin dispatcher
type Dispatcher interface {
// Dispatch invokes the appropriate validation plugin for the supplied transaction in the block
Dispatch(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode)
}
//go:generate mockery -dir . -name ChannelResources -case underscore -output mocks/
//go:generate mockery -dir . -name LedgerResources -case underscore -output mocks/
//go:generate mockery -dir . -name Dispatcher -case underscore -output mocks/
//go:generate mockery -dir . -name QueryExecutor -case underscore -output mocks/
// QueryExecutor is the local interface that used to generate mocks for foreign interface.
type QueryExecutor interface {
ledger.QueryExecutor
}
//go:generate mockery -dir . -name ChannelPolicyManagerGetter -case underscore -output mocks/
// ChannelPolicyManagerGetter is the local interface that used to generate mocks for foreign interface.
type ChannelPolicyManagerGetter interface {
policies.ChannelPolicyManagerGetter
}
//go:generate mockery -dir . -name PolicyManager -case underscore -output mocks/
type PolicyManager interface {
policies.Manager
}
//go:generate mockery -dir plugindispatcher/ -name CollectionResources -case underscore -output mocks/
// TxValidator is the implementation of Validator interface, keeps
// reference to the ledger to enable tx simulation
// and execution of plugins
type TxValidator struct {
ChannelID string
Semaphore Semaphore
ChannelResources ChannelResources
LedgerResources LedgerResources
Dispatcher Dispatcher
CryptoProvider bccsp.BCCSP
}
var logger = flogging.MustGetLogger("committer.txvalidator")
type blockValidationRequest struct {
block *common.Block
d []byte
tIdx int
}
type blockValidationResult struct {
tIdx int
validationCode peer.TxValidationCode
err error
txid string
}
// NewTxValidator creates new transactions validator
func NewTxValidator(
channelID string,
sem Semaphore,
cr ChannelResources,
ler LedgerResources,
lcr plugindispatcher.LifecycleResources,
cor plugindispatcher.CollectionResources,
pm plugin.Mapper,
channelPolicyManagerGetter policies.ChannelPolicyManagerGetter,
cryptoProvider bccsp.BCCSP,
) *TxValidator {
// Encapsulates interface implementation
pluginValidator := plugindispatcher.NewPluginValidator(pm, ler, &dynamicDeserializer{cr: cr}, &dynamicCapabilities{cr: cr}, channelPolicyManagerGetter, cor)
return &TxValidator{
ChannelID: channelID,
Semaphore: sem,
ChannelResources: cr,
LedgerResources: ler,
Dispatcher: plugindispatcher.New(channelID, cr, ler, lcr, pluginValidator),
CryptoProvider: cryptoProvider,
}
}
func (v *TxValidator) chainExists(chain string) bool {
// TODO: implement this function!
return true
}
// Validate performs the validation of a block. The validation
// of each transaction in the block is performed in parallel.
// The approach is as follows: the committer thread starts the
// tx validation function in a goroutine (using a semaphore to cap
// the number of concurrent validating goroutines). The committer
// thread then reads results of validation (in orderer of completion
// of the goroutines) from the results channel. The goroutines
// perform the validation of the txs in the block and enqueue the
// validation result in the results channel. A few note-worthy facts:
// 1) to keep the approach simple, the committer thread enqueues
// all transactions in the block and then moves on to reading the
// results.
// 2) for parallel validation to work, it is important that the
// validation function does not change the state of the system.
// Otherwise the order in which validation is perform matters
// and we have to resort to sequential validation (or some locking).
// This is currently true, because the only function that affects
// state is when a config transaction is received, but they are
// guaranteed to be alone in the block. If/when this assumption
// is violated, this code must be changed.
func (v *TxValidator) Validate(block *common.Block) error {
var err error
var errPos int
startValidation := time.Now() // timer to log Validate block duration
logger.Debugf("[%s] START Block Validation for block [%d]", v.ChannelID, block.Header.Number)
// Initialize trans as valid here, then set invalidation reason code upon invalidation below
txsfltr := txflags.New(len(block.Data.Data))
// array of txids
txidArray := make([]string, len(block.Data.Data))
results := make(chan *blockValidationResult)
go func() {
for tIdx, d := range block.Data.Data {
// ensure that we don't have too many concurrent validation workers
v.Semaphore.Acquire(context.Background())
go func(index int, data []byte) {
defer v.Semaphore.Release()
v.validateTx(&blockValidationRequest{
d: data,
block: block,
tIdx: index,
}, results)
}(tIdx, d)
}
}()
logger.Debugf("expecting %d block validation responses", len(block.Data.Data))
// now we read responses in the order in which they come back
for i := 0; i < len(block.Data.Data); i++ {
res := <-results
if res.err != nil {
// if there is an error, we buffer its value, wait for
// all workers to complete validation and then return
// the error from the first tx in this block that returned an error
logger.Debugf("got terminal error %s for idx %d", res.err, res.tIdx)
if err == nil || res.tIdx < errPos {
err = res.err
errPos = res.tIdx
}
} else {
// if there was no error, we set the txsfltr and we set the
// txsChaincodeNames and txsUpgradedChaincodes maps
logger.Debugf("got result for idx %d, code %d", res.tIdx, res.validationCode)
txsfltr.SetFlag(res.tIdx, res.validationCode)
if res.validationCode == peer.TxValidationCode_VALID {
txidArray[res.tIdx] = res.txid
}
}
}
// if we're here, all workers have completed the validation.
// If there was an error we return the error from the first
// tx in this block that returned an error
if err != nil {
return err
}
// we mark invalid any transaction that has a txid
// which is equal to that of a previous tx in this block
markTXIdDuplicates(txidArray, txsfltr)
// make sure no transaction has skipped validation
err = v.allValidated(txsfltr, block)
if err != nil {
return err
}
// Initialize metadata structure
protoutil.InitBlockMetadata(block)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsfltr
elapsedValidation := time.Since(startValidation) / time.Millisecond // duration in ms
logger.Infof("[%s] Validated block [%d] in %dms", v.ChannelID, block.Header.Number, elapsedValidation)
return nil
}
// allValidated returns error if some of the validation flags have not been set
// during validation
func (v *TxValidator) allValidated(txsfltr txflags.ValidationFlags, block *common.Block) error {
for id, f := range txsfltr {
if peer.TxValidationCode(f) == peer.TxValidationCode_NOT_VALIDATED {
return errors.Errorf("transaction %d in block %d has skipped validation", id, block.Header.Number)
}
}
return nil
}
func markTXIdDuplicates(txids []string, txsfltr txflags.ValidationFlags) {
txidMap := make(map[string]struct{})
for id, txid := range txids {
if txid == "" {
continue
}
_, in := txidMap[txid]
if in {
logger.Error("Duplicate txid", txid, "found, skipping")
txsfltr.SetFlag(id, peer.TxValidationCode_DUPLICATE_TXID)
} else {
txidMap[txid] = struct{}{}
}
}
}
func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *blockValidationResult) {
block := req.block
d := req.d
tIdx := req.tIdx
txID := ""
if d == nil {
results <- &blockValidationResult{
tIdx: tIdx,
}
return
}
if env, err := protoutil.GetEnvelopeFromBlock(d); err != nil {
logger.Warningf("Error getting tx from block: %+v", err)
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
}
return
} else if env != nil {
// validate the transaction: here we check that the transaction
// is properly formed, properly signed and that the security
// chain binding proposal to endorsements to tx holds. We do
// NOT check the validity of endorsements, though. That's a
// job for the validation plugins
logger.Debugf("[%s] validateTx starts for block %p env %p txn %d", v.ChannelID, block, env, tIdx)
defer logger.Debugf("[%s] validateTx completes for block %p env %p txn %d", v.ChannelID, block, env, tIdx)
var payload *common.Payload
var err error
var txResult peer.TxValidationCode
if payload, txResult = validation.ValidateTransaction(env, v.CryptoProvider); txResult != peer.TxValidationCode_VALID {
logger.Errorf("Invalid transaction with index %d", tIdx)
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: txResult,
}
return
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Could not unmarshal channel header, err %s, skipping", err)
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
}
return
}
channel := chdr.ChannelId
logger.Debugf("Transaction is for channel %s", channel)
if !v.chainExists(channel) {
logger.Errorf("Dropping transaction for non-existent channel %s", channel)
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_TARGET_CHAIN_NOT_FOUND,
}
return
}
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
txID = chdr.TxId
// Check duplicate transactions
erroneousResultEntry := v.checkTxIdDupsLedger(tIdx, chdr, v.LedgerResources)
if erroneousResultEntry != nil {
results <- erroneousResultEntry
return
}
// Validate tx with plugins
logger.Debug("Validating transaction with plugins")
err, cde := v.Dispatcher.Dispatch(tIdx, payload, d, block)
if err != nil {
logger.Errorf("Dispatch for transaction txId = %s returned error: %s", txID, err)
switch err.(type) {
case *commonerrors.VSCCExecutionFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
case *commonerrors.VSCCInfoLookupFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
default:
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: cde,
}
return
}
}
} else if common.HeaderType(chdr.Type) == common.HeaderType_CONFIG {
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
err = errors.WithMessage(err, "error unmarshalling config which passed initial validity checks")
logger.Criticalf("%+v", err)
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
}
if err := v.ChannelResources.Apply(configEnvelope); err != nil {
err = errors.WithMessage(err, "error validating config which passed initial validity checks")
logger.Criticalf("%+v", err)
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
}
logger.Debugf("config transaction received for chain %s", channel)
} else {
logger.Warningf("Unknown transaction type [%s] in block number [%d] transaction index [%d]",
common.HeaderType(chdr.Type), block.Header.Number, tIdx)
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_UNKNOWN_TX_TYPE,
}
return
}
if _, err := proto.Marshal(env); err != nil {
logger.Warningf("Cannot marshal transaction: %s", err)
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_MARSHAL_TX_ERROR,
}
return
}
// Succeeded to pass down here, transaction is valid
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_VALID,
txid: txID,
}
return
} else {
logger.Warning("Nil tx from block")
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_NIL_ENVELOPE,
}
return
}
}
// CheckTxIdDupsLedger returns a vlockValidationResult enhanced with the respective
// error codes if and only if there is transaction with the same transaction identifier
// in the ledger or no decision can be made for whether such transaction exists;
// the function returns nil if it has ensured that there is no such duplicate, such
// that its consumer can proceed with the transaction processing
func (v *TxValidator) checkTxIdDupsLedger(tIdx int, chdr *common.ChannelHeader, ldgr LedgerResources) *blockValidationResult {
// Retrieve the transaction identifier of the input header
txID := chdr.TxId
// Look for a transaction with the same identifier inside the ledger
_, err := ldgr.GetTransactionByID(txID)
switch err.(type) {
case nil:
// invalid case, returned error is nil. It means that there is already a tx in the ledger with the same id
logger.Error("Duplicate transaction found, ", txID, ", skipping")
return &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_DUPLICATE_TXID,
}
case ledger.NotFoundInIndexErr:
// valid case, returned error is of type NotFoundInIndexErr.
// It means that no tx with the same id is found in the ledger
return nil
default:
// invalid case, returned error is not of type NotFoundInIndexErr.
// It means that we could not verify whether a tx with the supplied id is in the ledger
logger.Errorf("Ledger failure while attempting to detect duplicate status for txid %s: %s", txID, err)
return &blockValidationResult{
tIdx: tIdx,
err: err,
}
}
}
type dynamicDeserializer struct {
cr ChannelResources
}
func (ds *dynamicDeserializer) DeserializeIdentity(serializedIdentity []byte) (msp.Identity, error) {
return ds.cr.MSPManager().DeserializeIdentity(serializedIdentity)
}
func (ds *dynamicDeserializer) IsWellFormed(identity *mspprotos.SerializedIdentity) error {
return ds.cr.MSPManager().IsWellFormed(identity)
}
type dynamicCapabilities struct {
cr ChannelResources
}
func (ds *dynamicCapabilities) ACLs() bool {
return ds.cr.Capabilities().ACLs()
}
func (ds *dynamicCapabilities) CollectionUpgrade() bool {
return ds.cr.Capabilities().CollectionUpgrade()
}
func (ds *dynamicCapabilities) ForbidDuplicateTXIdInBlock() bool {
return ds.cr.Capabilities().ForbidDuplicateTXIdInBlock()
}
func (ds *dynamicCapabilities) KeyLevelEndorsement() bool {
return ds.cr.Capabilities().KeyLevelEndorsement()
}
func (ds *dynamicCapabilities) MetadataLifecycle() bool {
// This capability no longer exists and should not be referenced in validation anyway
return false
}
func (ds *dynamicCapabilities) PrivateChannelData() bool {
return ds.cr.Capabilities().PrivateChannelData()
}
func (ds *dynamicCapabilities) StorePvtDataOfInvalidTx() bool {
return ds.cr.Capabilities().StorePvtDataOfInvalidTx()
}
func (ds *dynamicCapabilities) Supported() error {
return ds.cr.Capabilities().Supported()
}
func (ds *dynamicCapabilities) V1_1Validation() bool {
return ds.cr.Capabilities().V1_1Validation()
}
func (ds *dynamicCapabilities) V1_2Validation() bool {
return ds.cr.Capabilities().V1_2Validation()
}
func (ds *dynamicCapabilities) V1_3Validation() bool {
return ds.cr.Capabilities().V1_3Validation()
}
func (ds *dynamicCapabilities) V2_0Validation() bool {
return ds.cr.Capabilities().V2_0Validation()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/zhaochuninhefei/fabric-gm.git
git@gitee.com:zhaochuninhefei/fabric-gm.git
zhaochuninhefei
fabric-gm
fabric-gm
v0.0.1

搜索帮助

344bd9b3 5694891 D2dac590 5694891