Fetch the repository succeeded.
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package discovery
import (
"encoding/asn1"
"encoding/hex"
"sync"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
)
const (
defaultMaxCacheSize = 1000
defaultRetentionRatio = 0.75
)
var (
// asBytes is the function that is used to marshal common.SignedData to bytes
asBytes = asn1.Marshal
)
type acSupport interface {
// Eligible returns whether the given peer is eligible for receiving
// service from the discovery service for a given channel
EligibleForService(channel string, data common.SignedData) error
// ConfigSequence returns the configuration sequence of the given channel
ConfigSequence(channel string) uint64
}
type authCacheConfig struct {
enabled bool
// maxCacheSize is the maximum size of the cache, after which
// a purge takes place
maxCacheSize int
// purgeRetentionRatio is the % of entries that remain in the cache
// after the cache is purged due to overpopulation
purgeRetentionRatio float64
}
// authCache defines an interface that authenticates a request in a channel context,
// and does memoization of invocations
type authCache struct {
credentialCache map[string]*accessCache
acSupport
sync.RWMutex
conf authCacheConfig
}
func newAuthCache(s acSupport, conf authCacheConfig) *authCache {
return &authCache{
acSupport: s,
credentialCache: make(map[string]*accessCache),
conf: conf,
}
}
// Eligible returns whether the given peer is eligible for receiving
// service from the discovery service for a given channel
func (ac *authCache) EligibleForService(channel string, data common.SignedData) error {
if !ac.conf.enabled {
return ac.acSupport.EligibleForService(channel, data)
}
// Check whether we already have a cache for this channel
ac.RLock()
cache := ac.credentialCache[channel]
ac.RUnlock()
if cache == nil {
// Cache for given channel wasn't found, so create a new one
ac.Lock()
cache = ac.newAccessCache(channel)
// And store the cache instance.
ac.credentialCache[channel] = cache
ac.Unlock()
}
return cache.EligibleForService(data)
}
type accessCache struct {
sync.RWMutex
channel string
ac *authCache
lastSequence uint64
entries map[string]error
}
func (ac *authCache) newAccessCache(channel string) *accessCache {
return &accessCache{
channel: channel,
ac: ac,
entries: make(map[string]error),
}
}
func (cache *accessCache) EligibleForService(data common.SignedData) error {
key, err := signedDataToKey(data)
if err != nil {
logger.Warningf("Failed computing key of signed data: +%v", err)
return errors.Wrap(err, "failed computing key of signed data")
}
currSeq := cache.ac.acSupport.ConfigSequence(cache.channel)
if cache.isValid(currSeq) {
foundInCache, isEligibleErr := cache.lookup(key)
if foundInCache {
return isEligibleErr
}
} else {
cache.configChange(currSeq)
}
// Make sure the cache doesn't overpopulate.
// It might happen that it overgrows the maximum size due to concurrent
// goroutines waiting on the lock above, but that's acceptable.
cache.purgeEntriesIfNeeded()
// Compute the eligibility of the client for the service
err = cache.ac.acSupport.EligibleForService(cache.channel, data)
cache.Lock()
defer cache.Unlock()
// Check if the sequence hasn't changed since last time
if currSeq != cache.ac.acSupport.ConfigSequence(cache.channel) {
// The sequence at which we computed the eligibility might have changed,
// so we can't put it into the cache because a more fresh computation result
// might already be present in the cache by now, and we don't want to override it
// with a stale computation result, so just return the result.
return err
}
// Else, the eligibility of the client has been computed under the latest sequence,
// so store the computation result in the cache
cache.entries[key] = err
return err
}
func (cache *accessCache) isPurgeNeeded() bool {
cache.RLock()
defer cache.RUnlock()
return len(cache.entries)+1 > cache.ac.conf.maxCacheSize
}
func (cache *accessCache) purgeEntriesIfNeeded() {
if !cache.isPurgeNeeded() {
return
}
cache.Lock()
defer cache.Unlock()
maxCacheSize := cache.ac.conf.maxCacheSize
purgeRatio := cache.ac.conf.purgeRetentionRatio
entries2evict := maxCacheSize - int(purgeRatio*float64(maxCacheSize))
for key := range cache.entries {
if entries2evict == 0 {
return
}
entries2evict--
delete(cache.entries, key)
}
}
func (cache *accessCache) isValid(currSeq uint64) bool {
cache.RLock()
defer cache.RUnlock()
return currSeq == cache.lastSequence
}
func (cache *accessCache) configChange(currSeq uint64) {
cache.Lock()
defer cache.Unlock()
cache.lastSequence = currSeq
// Invalidate entries
cache.entries = make(map[string]error)
}
func (cache *accessCache) lookup(key string) (cacheHit bool, lookupResult error) {
cache.RLock()
defer cache.RUnlock()
lookupResult, cacheHit = cache.entries[key]
return
}
func signedDataToKey(data common.SignedData) (string, error) {
b, err := asBytes(data)
if err != nil {
return "", errors.Wrap(err, "failed marshaling signed data")
}
return hex.EncodeToString(util.ComputeSHA256(b)), nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。