1 Star 0 Fork 0

妥協 / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
pull.go 23.14 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privdata
import (
"bytes"
"fmt"
"math"
"math/rand"
"sync"
"time"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/filter"
"github.com/hyperledger/fabric/gossip/metrics"
privdatacommon "github.com/hyperledger/fabric/gossip/privdata/common"
"github.com/hyperledger/fabric/gossip/util"
fcommon "github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
)
const (
membershipPollingBackoff = time.Second
responseWaitTime = time.Second * 5
maxMembershipPollIterations = 5
)
// Dig2PvtRWSetWithConfig
type Dig2PvtRWSetWithConfig map[privdatacommon.DigKey]*util.PrivateRWSetWithConfig
// PrivateDataRetriever interface which defines API capable
// of retrieving required private data
type PrivateDataRetriever interface {
// CollectionRWSet returns the bytes of CollectionPvtReadWriteSet for a given txID and collection from the transient store
CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error)
}
// gossip defines capabilities that the gossip module gives the Coordinator
type gossip interface {
// Send sends a message to remote peers
Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)
// PeersOfChannel returns the NetworkMembers considered alive
// and also subscribed to the channel given
PeersOfChannel(common.ChainID) []discovery.NetworkMember
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria, and that they published their channel participation
PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error)
// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
// can be used to send a reply back to the sender
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage)
}
type puller struct {
metrics *metrics.PrivdataMetrics
pubSub *util.PubSub
stopChan chan struct{}
msgChan <-chan proto.ReceivedMessage
channel string
cs privdata.CollectionStore
btlPullMargin uint64
gossip
PrivateDataRetriever
CollectionAccessFactory
}
// NewPuller creates new private data puller
func NewPuller(metrics *metrics.PrivdataMetrics, cs privdata.CollectionStore, g gossip,
dataRetriever PrivateDataRetriever, factory CollectionAccessFactory, channel string, btlPullMargin uint64) *puller {
p := &puller{
metrics: metrics,
pubSub: util.NewPubSub(),
stopChan: make(chan struct{}),
channel: channel,
cs: cs,
btlPullMargin: btlPullMargin,
gossip: g,
PrivateDataRetriever: dataRetriever,
CollectionAccessFactory: factory,
}
_, p.msgChan = p.Accept(func(o interface{}) bool {
msg := o.(proto.ReceivedMessage).GetGossipMessage()
if !bytes.Equal(msg.Channel, []byte(p.channel)) {
return false
}
return msg.IsPrivateDataMsg()
}, true)
go p.listen()
return p
}
func (p *puller) listen() {
for {
select {
case <-p.stopChan:
return
case msg := <-p.msgChan:
if msg == nil {
// comm module stopped, hence this channel
// closed
return
}
if msg.GetGossipMessage().GetPrivateRes() != nil {
p.handleResponse(msg)
}
if msg.GetGossipMessage().GetPrivateReq() != nil {
p.handleRequest(msg)
}
}
}
}
func (p *puller) handleRequest(message proto.ReceivedMessage) {
logger.Debug("Got", message.GetGossipMessage(), "from", message.GetConnectionInfo().Endpoint)
message.Respond(&proto.GossipMessage{
Channel: []byte(p.channel),
Tag: proto.GossipMessage_CHAN_ONLY,
Nonce: message.GetGossipMessage().Nonce,
Content: &proto.GossipMessage_PrivateRes{
PrivateRes: &proto.RemotePvtDataResponse{
Elements: p.createResponse(message),
},
},
})
}
func (p *puller) createResponse(message proto.ReceivedMessage) []*proto.PvtDataElement {
authInfo := message.GetConnectionInfo().Auth
var returned []*proto.PvtDataElement
connectionEndpoint := message.GetConnectionInfo().Endpoint
defer func() {
logger.Debug("Returning", connectionEndpoint, len(returned), "elements")
}()
msg := message.GetGossipMessage()
// group all digest by block number
block2dig := groupDigestsByBlockNum(msg.GetPrivateReq().Digests)
for blockNum, digests := range block2dig {
start := time.Now()
dig2rwSets, wasFetchedFromLedger, err := p.CollectionRWSet(digests, blockNum)
p.metrics.RetrieveDuration.With("channel", p.channel).Observe(time.Since(start).Seconds())
if err != nil {
logger.Warningf("could not obtain private collection rwset for block %d, because of %s, continue...", blockNum, err)
continue
}
returned = append(returned, p.filterNotEligible(dig2rwSets, wasFetchedFromLedger, fcommon.SignedData{
Identity: message.GetConnectionInfo().Identity,
Data: authInfo.SignedData,
Signature: authInfo.Signature,
}, connectionEndpoint)...)
}
return returned
}
// groupDigestsByBlockNum group all digest by block sequence number
func groupDigestsByBlockNum(digests []*proto.PvtDataDigest) map[uint64][]*proto.PvtDataDigest {
results := make(map[uint64][]*proto.PvtDataDigest)
for _, dig := range digests {
results[dig.BlockSeq] = append(results[dig.BlockSeq], dig)
}
return results
}
func (p *puller) handleResponse(message proto.ReceivedMessage) {
msg := message.GetGossipMessage().GetPrivateRes()
logger.Debug("Got", msg, "from", message.GetConnectionInfo().Endpoint)
for _, el := range msg.Elements {
if el.Digest == nil {
logger.Warning("Got nil digest from", message.GetConnectionInfo().Endpoint, "aborting")
return
}
hash, err := el.Digest.Hash()
if err != nil {
logger.Warning("Failed hashing digest from", message.GetConnectionInfo().Endpoint, "aborting")
return
}
p.pubSub.Publish(hash, el)
}
}
func (p *puller) waitForMembership() []discovery.NetworkMember {
polIteration := 0
for {
members := p.PeersOfChannel(common.ChainID(p.channel))
if len(members) != 0 {
return members
}
polIteration++
if polIteration == maxMembershipPollIterations {
return nil
}
time.Sleep(membershipPollingBackoff)
}
}
func (p *puller) fetch(dig2src dig2sources) (*privdatacommon.FetchedPvtDataContainer, error) {
// computeFilters returns a map from a digest to a routing filter
dig2Filter, err := p.computeFilters(dig2src)
if err != nil {
return nil, errors.WithStack(err)
}
return p.fetchPrivateData(dig2Filter)
}
func (p *puller) FetchReconciledItems(dig2collectionConfig privdatacommon.Dig2CollectionConfig) (*privdatacommon.FetchedPvtDataContainer, error) {
// computeFilters returns a map from a digest to a routing filter
dig2Filter, err := p.computeReconciliationFilters(dig2collectionConfig)
if err != nil {
return nil, errors.WithStack(err)
}
return p.fetchPrivateData(dig2Filter)
}
func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdatacommon.FetchedPvtDataContainer, error) {
// Get a list of peers per channel
allFilters := dig2Filter.flattenFilterValues()
members := p.waitForMembership()
logger.Debug("Total members in channel:", members)
members = filter.AnyMatch(members, allFilters...)
logger.Debug("Total members that fit some digest:", members)
if len(members) == 0 {
logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting")
return nil, errors.New("Empty membership")
}
members = randomizeMemberList(members)
res := &privdatacommon.FetchedPvtDataContainer{}
// Distribute requests to peers, and obtain subscriptions for all their messages
// matchDigestToPeer returns a map from a peer to the digests which we would ask it for
var peer2digests peer2Digests
// We expect all private RWSets represented as digests to be collected
itemsLeftToCollect := len(dig2Filter)
// As long as we still have some data to collect and new members to ask the data for:
for itemsLeftToCollect > 0 && len(members) > 0 {
purgedPvt := p.getPurgedCollections(members, dig2Filter)
// Need to remove purged digest from mapping
for _, dig := range purgedPvt {
res.PurgedElements = append(res.PurgedElements, &proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
// remove digest so we won't even try to pull purged data
delete(dig2Filter, dig)
itemsLeftToCollect--
}
if itemsLeftToCollect == 0 {
logger.Debug("No items left to collect")
return res, nil
}
peer2digests, members = p.assignDigestsToPeers(members, dig2Filter)
if len(peer2digests) == 0 {
logger.Warning("No available peers for digests request, "+
"cannot pull missing private data for following digests [%+v], peer membership: [%+v]",
dig2Filter.digests(), members)
return res, nil
}
logger.Debug("Matched", len(dig2Filter), "digests to", len(peer2digests), "peer(s)")
subscriptions := p.scatterRequests(peer2digests)
responses := p.gatherResponses(subscriptions)
for _, resp := range responses {
if len(resp.Payload) == 0 {
logger.Debug("Got empty response for", resp.Digest)
continue
}
delete(dig2Filter, privdatacommon.DigKey{
TxId: resp.Digest.TxId,
BlockSeq: resp.Digest.BlockSeq,
SeqInBlock: resp.Digest.SeqInBlock,
Namespace: resp.Digest.Namespace,
Collection: resp.Digest.Collection,
})
itemsLeftToCollect--
}
res.AvailableElements = append(res.AvailableElements, responses...)
}
return res, nil
}
func (p *puller) gatherResponses(subscriptions []util.Subscription) []*proto.PvtDataElement {
var res []*proto.PvtDataElement
privateElements := make(chan *proto.PvtDataElement, len(subscriptions))
var wg sync.WaitGroup
wg.Add(len(subscriptions))
start := time.Now()
// Listen for all subscriptions, and add then into a single channel
for _, sub := range subscriptions {
go func(sub util.Subscription) {
defer wg.Done()
el, err := sub.Listen()
if err != nil {
return
}
privateElements <- el.(*proto.PvtDataElement)
p.metrics.PullDuration.With("channel", p.channel).Observe(time.Since(start).Seconds())
}(sub)
}
// Wait for all subscriptions to either return, or time out
wg.Wait()
// Close the channel, to not block when we iterate it.
close(privateElements)
// Aggregate elements to return them as a slice
for el := range privateElements {
res = append(res, el)
}
return res
}
func (p *puller) scatterRequests(peersDigestMapping peer2Digests) []util.Subscription {
var subscriptions []util.Subscription
for peer, digests := range peersDigestMapping {
msg := &proto.GossipMessage{
Tag: proto.GossipMessage_CHAN_ONLY,
Channel: []byte(p.channel),
Nonce: util.RandomUInt64(),
Content: &proto.GossipMessage_PrivateReq{
PrivateReq: &proto.RemotePvtDataRequest{
Digests: digestsAsPointerSlice(digests),
},
},
}
// Subscribe to all digests prior to sending them
for _, dig := range msg.GetPrivateReq().Digests {
hash, err := dig.Hash()
if err != nil {
// Shouldn't happen as we just built this message ourselves
logger.Warning("Failed creating digest", err)
continue
}
sub := p.pubSub.Subscribe(hash, responseWaitTime)
subscriptions = append(subscriptions, sub)
}
logger.Debug("Sending", peer.endpoint, "request", msg.GetPrivateReq().Digests)
p.Send(msg, peer.AsRemotePeer())
}
return subscriptions
}
type peer2Digests map[remotePeer][]proto.PvtDataDigest
type noneSelectedPeers []discovery.NetworkMember
func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Filter digestToFilterMapping) (peer2Digests, noneSelectedPeers) {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debug("Matching", members, "to", dig2Filter.String())
}
res := make(map[remotePeer][]proto.PvtDataDigest)
// Create a mapping between peer and digests to ask for
for dig, collectionFilter := range dig2Filter {
// Find a peer that is a preferred peer
selectedPeer := filter.First(members, collectionFilter.preferredPeer)
if selectedPeer == nil {
logger.Debug("No preferred peer found for", dig)
// Find some peer that is in the collection
selectedPeer = filter.First(members, collectionFilter.anyPeer)
}
if selectedPeer == nil {
logger.Debug("No peer matches txID", dig.TxId, "collection", dig.Collection)
continue
}
// Add the peer to the mapping from peer to digest slice
peer := remotePeer{pkiID: string(selectedPeer.PKIID), endpoint: selectedPeer.Endpoint}
res[peer] = append(res[peer], proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
}
var noneSelectedPeers []discovery.NetworkMember
for _, member := range members {
peer := remotePeer{endpoint: member.PreferredEndpoint(), pkiID: string(member.PKIid)}
if _, selected := res[peer]; !selected {
noneSelectedPeers = append(noneSelectedPeers, member)
}
}
return res, noneSelectedPeers
}
type collectionRoutingFilter struct {
anyPeer filter.RoutingFilter
preferredPeer filter.RoutingFilter
}
type digestToFilterMapping map[privdatacommon.DigKey]collectionRoutingFilter
func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter {
var filters []filter.RoutingFilter
for _, f := range dig2f {
filters = append(filters, f.preferredPeer)
filters = append(filters, f.anyPeer)
}
return filters
}
func (dig2f digestToFilterMapping) digests() []proto.PvtDataDigest {
var digs []proto.PvtDataDigest
for d := range dig2f {
digs = append(digs, proto.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
SeqInBlock: d.SeqInBlock,
Namespace: d.Namespace,
Collection: d.Collection,
})
}
return digs
}
// String returns a string representation of t he digestToFilterMapping
func (dig2f digestToFilterMapping) String() string {
var buffer bytes.Buffer
collection2TxID := make(map[string][]string)
for dig := range dig2f {
collection2TxID[dig.Collection] = append(collection2TxID[dig.Collection], dig.TxId)
}
for col, txIDs := range collection2TxID {
buffer.WriteString(fmt.Sprintf("{%s: %v}", col, txIDs))
}
return buffer.String()
}
func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {
filters := make(map[privdatacommon.DigKey]collectionRoutingFilter)
for digest, sources := range dig2src {
anyPeerInCollection, err := p.getLatestCollectionConfigRoutingFilter(digest.Namespace, digest.Collection)
if err != nil {
return nil, errors.WithStack(err)
}
sources := sources
endorserPeer, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature) bool {
for _, endorsement := range sources {
if bytes.Equal(endorsement.Endorser, []byte(peerSignature.PeerIdentity)) {
return true
}
}
return false
})
if err != nil {
return nil, errors.WithStack(err)
}
filters[digest] = collectionRoutingFilter{
anyPeer: anyPeerInCollection,
preferredPeer: endorserPeer,
}
}
return filters, nil
}
func (p *puller) computeReconciliationFilters(dig2collectionConfig privdatacommon.Dig2CollectionConfig) (digestToFilterMapping, error) {
filters := make(map[privdatacommon.DigKey]collectionRoutingFilter)
for digest, originalCollectionConfig := range dig2collectionConfig {
anyPeerInCollection, err := p.getLatestCollectionConfigRoutingFilter(digest.Namespace, digest.Collection)
if err != nil {
return nil, err
}
originalConfigFilter, err := p.cs.AccessFilter(p.channel, originalCollectionConfig.MemberOrgsPolicy)
if err != nil {
return nil, err
}
if originalConfigFilter == nil {
return nil, errors.Errorf("Failed obtaining original collection filter for channel %s, config %s", p.channel, digest.Collection)
}
// get peers that were in the collection config while the missing data was created
peerFromDataCreation, err := p.getMatchAllRoutingFilter(originalConfigFilter)
if err != nil {
return nil, err
}
// prefer peers that are in the collection from the time the data was created rather than ones that were added later.
// the assumption is that the longer the peer is in the collection config, the chances it has the data are bigger.
preferredPeer := func(member discovery.NetworkMember) bool {
return peerFromDataCreation(member) && anyPeerInCollection(member)
}
filters[digest] = collectionRoutingFilter{
anyPeer: anyPeerInCollection,
preferredPeer: preferredPeer,
}
}
return filters, nil
}
func (p *puller) getLatestCollectionConfigRoutingFilter(chaincode string, collection string) (filter.RoutingFilter, error) {
cc := fcommon.CollectionCriteria{
Channel: p.channel,
Collection: collection,
Namespace: chaincode,
}
latestCollectionConfig, err := p.cs.RetrieveCollectionAccessPolicy(cc)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("failed obtaining collection policy for channel %s, chaincode %s, config %s", p.channel, chaincode, collection))
}
filt := latestCollectionConfig.AccessFilter()
if filt == nil {
return nil, errors.Errorf("Failed obtaining collection filter for channel %s, chaincode %s, collection %s", p.channel, chaincode, collection)
}
anyPeerInCollection, err := p.getMatchAllRoutingFilter(filt)
if err != nil {
return nil, err
}
return anyPeerInCollection, nil
}
func (p *puller) getMatchAllRoutingFilter(filt privdata.Filter) (filter.RoutingFilter, error) {
routingFilter, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature) bool {
return filt(fcommon.SignedData{
Signature: peerSignature.Signature,
Identity: peerSignature.PeerIdentity,
Data: peerSignature.Message,
})
})
return routingFilter, err
}
func (p *puller) getPurgedCollections(members []discovery.NetworkMember, dig2Filter digestToFilterMapping) []privdatacommon.DigKey {
var res []privdatacommon.DigKey
for dig := range dig2Filter {
purged, err := p.purgedFilter(dig)
if err != nil {
logger.Debug("Failed to obtain purged filter for digest %v", dig, "error", err)
continue
}
membersWithPurgedData := filter.AnyMatch(members, purged)
// at least one peer already purged the data
if len(membersWithPurgedData) > 0 {
logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+
"has been purged at peers [%v]", p.channel, dig.Namespace,
dig.Collection, dig.TxId, membersWithPurgedData)
res = append(res, dig)
}
}
return res
}
func (p *puller) purgedFilter(dig privdatacommon.DigKey) (filter.RoutingFilter, error) {
cc := fcommon.CollectionCriteria{
Channel: p.channel,
TxId: dig.TxId,
Collection: dig.Collection,
Namespace: dig.Namespace,
}
colPersistConfig, err := p.cs.RetrieveCollectionPersistenceConfigs(cc)
if err != nil {
return nil, errors.WithStack(err)
}
return func(peer discovery.NetworkMember) bool {
if peer.Properties == nil {
logger.Debugf("No properties provided for peer %s", peer.Endpoint)
return false
}
// BTL equals to zero has semantic of never expires
if colPersistConfig.BlockToLive() == uint64(0) {
return false
}
// handle overflow
expirationSeqNum := addWithOverflow(dig.BlockSeq, colPersistConfig.BlockToLive())
peerLedgerHeightWithMargin := addWithOverflow(peer.Properties.LedgerHeight, p.btlPullMargin)
isPurged := peerLedgerHeightWithMargin >= expirationSeqNum
if isPurged {
logger.Debugf("skipping peer [%s], since pvt for channel [%s], txID = [%s], "+
"collection [%s] has been purged or will soon be purged, BTL=[%d]",
peer.Endpoint, p.channel, cc.TxId, cc.Collection, colPersistConfig.BlockToLive())
}
return isPurged
}, nil
}
func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, shouldCheckLatestConfig bool, signedData fcommon.SignedData, endpoint string) []*proto.PvtDataElement {
var returned []*proto.PvtDataElement
for d, rwSets := range dig2rwSets {
if rwSets == nil {
logger.Errorf("No private rwset for [%s] channel, chaincode [%s], collection [%s], txID = [%s] is available, skipping...",
p.channel, d.Namespace, d.Collection, d.TxId)
continue
}
logger.Debug("Found", len(rwSets.RWSet), "for TxID", d.TxId, ", collection", d.Collection, "for", endpoint)
if len(rwSets.RWSet) == 0 {
continue
}
eligibleForCollection := shouldCheckLatestConfig && p.isEligibleByLatestConfig(p.channel, d.Collection, d.Namespace, signedData)
if !eligibleForCollection {
colAP, err := p.AccessPolicy(rwSets.CollectionConfig, p.channel)
if err != nil {
logger.Debug("No policy found for channel", p.channel, ", collection", d.Collection, "txID", d.TxId, ":", err, "skipping...")
continue
}
colFilter := colAP.AccessFilter()
if colFilter == nil {
logger.Debug("Collection ", d.Collection, " has no access filter, txID", d.TxId, "skipping...")
continue
}
eligibleForCollection = colFilter(signedData)
}
if !eligibleForCollection {
logger.Debug("Peer", endpoint, "isn't eligible for txID", d.TxId, "at collection", d.Collection)
continue
}
returned = append(returned, &proto.PvtDataElement{
Digest: &proto.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
Collection: d.Collection,
Namespace: d.Namespace,
SeqInBlock: d.SeqInBlock,
},
Payload: util.PrivateRWSets(rwSets.RWSet...),
})
}
return returned
}
func (p *puller) isEligibleByLatestConfig(channel string, collection string, chaincode string, signedData fcommon.SignedData) bool {
cc := fcommon.CollectionCriteria{
Channel: channel,
Collection: collection,
Namespace: chaincode,
}
latestCollectionConfig, err := p.cs.RetrieveCollectionAccessPolicy(cc)
if err != nil {
return false
}
collectionFilter := latestCollectionConfig.AccessFilter()
return collectionFilter(signedData)
}
func randomizeMemberList(members []discovery.NetworkMember) []discovery.NetworkMember {
rand.Seed(time.Now().UnixNano())
res := make([]discovery.NetworkMember, len(members))
for i, j := range rand.Perm(len(members)) {
res[i] = members[j]
}
return res
}
func digestsAsPointerSlice(digests []proto.PvtDataDigest) []*proto.PvtDataDigest {
res := make([]*proto.PvtDataDigest, len(digests))
for i, dig := range digests {
// re-introduce dig variable to allocate
// new address for each iteration
dig := dig
res[i] = &dig
}
return res
}
type remotePeer struct {
endpoint string
pkiID string
}
// AsRemotePeer converts this remotePeer to comm.RemotePeer
func (rp remotePeer) AsRemotePeer() *comm.RemotePeer {
return &comm.RemotePeer{
PKIID: common.PKIidType(rp.pkiID),
Endpoint: rp.endpoint,
}
}
func addWithOverflow(a uint64, b uint64) uint64 {
res := a + b
if res < a {
return math.MaxUint64
}
return res
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liurenhao/fabric.git
git@gitee.com:liurenhao/fabric.git
liurenhao
fabric
fabric
v1.4.2

搜索帮助

344bd9b3 5694891 D2dac590 5694891