63 Star 181 Fork 3

Gitee 极速下载/hyperledger-fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
克隆/下载
dataretriever.go 9.25 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privdata
import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/privdata/common"
"github.com/hyperledger/fabric/gossip/util"
gossip2 "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/pkg/errors"
)
// StorageDataRetriever defines an API to retrieve private date from the storage
type StorageDataRetriever interface {
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil, bool which is true if data fetched from ledger and false if was fetched from transient store, and an error
CollectionRWSet(dig []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error)
}
//go:generate mockery -dir . -name DataStore -case underscore -output mocks/
//go:generate mockery -dir ../../core/transientstore/ -name RWSetScanner -case underscore -output mocks/
//go:generate mockery -dir ../../core/ledger/ -name ConfigHistoryRetriever -case underscore -output mocks/
// DataStore defines set of APIs need to get private data
// from underlined data store
type DataStore interface {
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorsers (via Gossip)
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (transientstore.RWSetScanner, error)
// GetPvtDataByNum returns a slice of the private data from the ledger
// for given block and based on the filter which indicates a map of
// collections and namespaces of private data to retrieve
GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error)
// GetConfigHistoryRetriever returns the ConfigHistoryRetriever
GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error)
// Get recent block sequence number
LedgerHeight() (uint64, error)
}
type dataRetriever struct {
store DataStore
}
// NewDataRetriever constructing function for implementation of the
// StorageDataRetriever interface
func NewDataRetriever(store DataStore) StorageDataRetriever {
return &dataRetriever{store: store}
}
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil, bool which is true if data fetched from ledger and false if was fetched from transient store, and an error
func (dr *dataRetriever) CollectionRWSet(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error) {
height, err := dr.store.LedgerHeight()
if err != nil {
// if there is an error getting info from the ledger, we need to try to read from transient store
return nil, false, errors.Wrap(err, "wasn't able to read ledger height")
}
if height <= blockNum {
logger.Debug("Current ledger height ", height, "is below requested block sequence number",
blockNum, "retrieving private data from transient store")
}
if height <= blockNum { // Check whenever current ledger height is equal or below block sequence num.
results := make(Dig2PvtRWSetWithConfig)
for _, dig := range digests {
filter := map[string]ledger.PvtCollFilter{
dig.Namespace: map[string]bool{
dig.Collection: true,
},
}
pvtRWSet, err := dr.fromTransientStore(dig, filter)
if err != nil {
logger.Errorf("couldn't read from transient store private read-write set, "+
"digest %+v, because of %s", dig, err)
continue
}
results[common.DigKey{
Namespace: dig.Namespace,
Collection: dig.Collection,
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
}] = pvtRWSet
}
return results, false, nil
}
// Since ledger height is above block sequence number private data is might be available in the ledger
results, err := dr.fromLedger(digests, blockNum)
return results, true, err
}
func (dr *dataRetriever) fromLedger(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
filter := make(map[string]ledger.PvtCollFilter)
for _, dig := range digests {
if _, ok := filter[dig.Namespace]; !ok {
filter[dig.Namespace] = make(ledger.PvtCollFilter)
}
filter[dig.Namespace][dig.Collection] = true
}
pvtData, err := dr.store.GetPvtDataByNum(blockNum, filter)
if err != nil {
return nil, errors.Errorf("wasn't able to obtain private data, block sequence number %d, due to %s", blockNum, err)
}
results := make(Dig2PvtRWSetWithConfig)
for _, dig := range digests {
dig := dig
pvtRWSetWithConfig := &util.PrivateRWSetWithConfig{}
for _, data := range pvtData {
if data.WriteSet == nil {
logger.Warning("Received nil write set for collection tx in block", data.SeqInBlock, "block number", blockNum)
continue
}
// private data doesn't hold rwsets for namespace and collection or
// belongs to different transaction
if !data.Has(dig.Namespace, dig.Collection) || data.SeqInBlock != dig.SeqInBlock {
continue
}
pvtRWSet := dr.extractPvtRWsets(data.WriteSet.NsPvtRwset, dig.Namespace, dig.Collection)
pvtRWSetWithConfig.RWSet = append(pvtRWSetWithConfig.RWSet, pvtRWSet...)
}
confHistoryRetriever, err := dr.store.GetConfigHistoryRetriever()
if err != nil {
return nil, errors.Errorf("cannot obtain configuration history retriever, for collection <%s>"+
" txID <%s> block sequence number <%d> due to <%s>", dig.Collection, dig.TxId, dig.BlockSeq, err)
}
configInfo, err := confHistoryRetriever.MostRecentCollectionConfigBelow(dig.BlockSeq, dig.Namespace)
if err != nil {
return nil, errors.Errorf("cannot find recent collection config update below block sequence = %d,"+
" collection name = <%s> for chaincode <%s>", dig.BlockSeq, dig.Collection, dig.Namespace)
}
if configInfo == nil {
return nil, errors.Errorf("no collection config update below block sequence = <%d>"+
" collection name = <%s> for chaincode <%s> is available ", dig.BlockSeq, dig.Collection, dig.Namespace)
}
configs := extractCollectionConfig(configInfo.CollectionConfig, dig.Collection)
if configs == nil {
return nil, errors.Errorf("no collection config was found for collection <%s>"+
" namespace <%s> txID <%s>", dig.Collection, dig.Namespace, dig.TxId)
}
pvtRWSetWithConfig.CollectionConfig = configs
results[common.DigKey{
Namespace: dig.Namespace,
Collection: dig.Collection,
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
}] = pvtRWSetWithConfig
}
return results, nil
}
func (dr *dataRetriever) fromTransientStore(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) (*util.PrivateRWSetWithConfig, error) {
results := &util.PrivateRWSetWithConfig{}
it, err := dr.store.GetTxPvtRWSetByTxid(dig.TxId, filter)
if err != nil {
return nil, errors.Errorf("was not able to retrieve private data from transient store, namespace <%s>"+
", collection name %s, txID <%s>, due to <%s>", dig.Namespace, dig.Collection, dig.TxId, err)
}
defer it.Close()
maxEndorsedAt := uint64(0)
for {
res, err := it.NextWithConfig()
if err != nil {
return nil, errors.Errorf("error getting next element out of private data iterator, namespace <%s>"+
", collection name <%s>, txID <%s>, due to <%s>", dig.Namespace, dig.Collection, dig.TxId, err)
}
if res == nil {
return results, nil
}
rws := res.PvtSimulationResultsWithConfig
if rws == nil {
logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
continue
}
txPvtRWSet := rws.PvtRwset
if txPvtRWSet == nil {
logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
continue
}
colConfigs, found := rws.CollectionConfigs[dig.Namespace]
if !found {
logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name",
dig.Namespace, "txID", dig.TxId)
continue
}
configs := extractCollectionConfig(colConfigs, dig.Collection)
if configs == nil {
logger.Error("No collection config was found for collection", dig.Collection,
"namespace", dig.Namespace, "txID", dig.TxId)
continue
}
pvtRWSet := dr.extractPvtRWsets(txPvtRWSet.NsPvtRwset, dig.Namespace, dig.Collection)
if rws.EndorsedAt >= maxEndorsedAt {
maxEndorsedAt = rws.EndorsedAt
results.CollectionConfig = configs
}
results.RWSet = append(results.RWSet, pvtRWSet...)
}
}
func (dr *dataRetriever) extractPvtRWsets(pvtRWSets []*rwset.NsPvtReadWriteSet, namespace string, collectionName string) []util.PrivateRWSet {
pRWsets := []util.PrivateRWSet{}
// Iterate over all namespaces
for _, nsws := range pvtRWSets {
// and in each namespace - iterate over all collections
if nsws.Namespace != namespace {
logger.Debug("Received private data namespace ", nsws.Namespace, " instead of ", namespace, " skipping...")
continue
}
for _, col := range nsws.CollectionPvtRwset {
// This isn't the collection we're looking for
if col.CollectionName != collectionName {
logger.Debug("Received private data collection ", col.CollectionName, " instead of ", collectionName, " skipping...")
continue
}
// Add the collection pRWset to the accumulated set
pRWsets = append(pRWsets, col.Rwset)
}
}
return pRWsets
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/hyperledger-fabric.git
git@gitee.com:mirrors/hyperledger-fabric.git
mirrors
hyperledger-fabric
hyperledger-fabric
v1.4.6

搜索帮助

344bd9b3 5694891 D2dac590 5694891