6 Star 46 Fork 28

Hyperledger/fabric

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
gossip_service.go 11.35 KB
Copy Edit Raw Blame History
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"sync"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/integration"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var (
gossipServiceInstance *gossipServiceImpl
once sync.Once
)
type gossipSvc gossip.Gossip
// GossipService encapsulates gossip and state capabilities into single interface
type GossipService interface {
gossip.Gossip
// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
NewConfigEventer() ConfigProcessor
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
InitializeChannel(chainID string, committer committer.Committer, endpoints []string)
// GetBlock returns block for given chain
GetBlock(chainID string, index uint64) *common.Block
// AddPayload appends message payload to for given chain
AddPayload(chainID string, payload *proto.Payload) error
}
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipService, endpoints []string, msc api.MessageCryptoService) (deliverclient.DeliverService, error)
}
type deliveryFactoryImpl struct {
}
// Returns an instance of delivery client
func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(&deliverclient.Config{
CryptoSvc: mcs,
Gossip: g,
Endpoints: endpoints,
ConnFactory: deliverclient.DefaultConnectionFactory,
ABCFactory: deliverclient.DefaultABCFactory,
})
}
type gossipServiceImpl struct {
gossipSvc
chains map[string]state.GossipStateProvider
leaderElection map[string]election.LeaderElectionService
deliveryService deliverclient.DeliverService
deliveryFactory DeliveryServiceFactory
lock sync.RWMutex
idMapper identity.Mapper
mcs api.MessageCryptoService
peerIdentity []byte
secAdv api.SecurityAdvisor
}
// This is an implementation of api.JoinChannelMessage.
type joinChannelMessage struct {
seqNum uint64
members2AnchorPeers map[string][]api.AnchorPeer
}
func (jcm *joinChannelMessage) SequenceNumber() uint64 {
return jcm.seqNum
}
// Members returns the organizations of the channel
func (jcm *joinChannelMessage) Members() []api.OrgIdentityType {
members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers))
i := 0
for org := range jcm.members2AnchorPeers {
members[i] = api.OrgIdentityType(org)
i++
}
return members
}
// AnchorPeersOf returns the anchor peers of the given organization
func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer {
return jcm.members2AnchorPeers[string(org)]
}
var logger = util.GetLogger(util.LoggingServiceModule, "")
// InitGossipService initialize gossip service
func InitGossipService(peerIdentity []byte, endpoint string, s *grpc.Server, mcs api.MessageCryptoService,
secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) {
// TODO: Remove this.
// TODO: This is a temporary work-around to make the gossip leader election module load its logger at startup
// TODO: in order for the flogging package to register this logger in time so it can set the log levels as requested in the config
util.GetLogger(util.LoggingElectionModule, "")
InitGossipServiceCustomDeliveryFactory(peerIdentity, endpoint, s, &deliveryFactoryImpl{},
mcs, secAdv, secureDialOpts, bootPeers...)
}
// InitGossipServiceCustomDeliveryFactory initialize gossip service with customize delivery factory
// implementation, might be useful for testing and mocking purposes
func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string, s *grpc.Server,
factory DeliveryServiceFactory, mcs api.MessageCryptoService, secAdv api.SecurityAdvisor,
secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) {
once.Do(func() {
if overrideEndpoint := viper.GetString("peer.gossip.endpoint"); overrideEndpoint != "" {
endpoint = overrideEndpoint
}
logger.Info("Initialize gossip with endpoint", endpoint, "and bootstrap set", bootPeers)
idMapper := identity.NewIdentityMapper(mcs, peerIdentity)
gossip := integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv,
mcs, idMapper, secureDialOpts, bootPeers...)
gossipServiceInstance = &gossipServiceImpl{
mcs: mcs,
gossipSvc: gossip,
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
deliveryFactory: factory,
idMapper: idMapper,
peerIdentity: peerIdentity,
secAdv: secAdv,
}
})
}
// GetGossipService returns an instance of gossip service
func GetGossipService() GossipService {
return gossipServiceInstance
}
// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
func (g *gossipServiceImpl) NewConfigEventer() ConfigProcessor {
return newConfigEventer(g)
}
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", chainID)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
if g.deliveryService == nil {
var err error
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints, g.mcs)
if err != nil {
logger.Warning("Cannot create delivery client, due to", err)
}
}
// Delivery service might be nil only if it was not able to get connected
// to the ordering service
if g.deliveryService != nil {
// Parameters:
// - peer.gossip.useLeaderElection
// - peer.gossip.orgLeader
//
// are mutual exclusive, setting both to true is not defined, hence
// peer will panic and terminate
leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")
if leaderElection && isStaticOrgLeader {
logger.Panic("Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution")
}
if leaderElection {
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
} else if isStaticOrgLeader {
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
g.deliveryService.StartDeliverForChannel(chainID, committer)
} else {
logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
}
} else {
logger.Warning("Delivery client is down won't be able to pull blocks for chain", chainID)
}
}
// configUpdated constructs a joinChannelMessage and sends it to the gossipSvc
func (g *gossipServiceImpl) configUpdated(config Config) {
myOrg := string(g.secAdv.OrgByPeerIdentity(api.PeerIdentityType(g.peerIdentity)))
if !g.amIinChannel(myOrg, config) {
logger.Error("Tried joining channel", config.ChainID(), "but our org(", myOrg, "), isn't "+
"among the orgs of the channel:", orgListFromConfig(config), ", aborting.")
return
}
jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}}
for _, appOrg := range config.Organizations() {
logger.Debug(appOrg.MSPID(), "anchor peers:", appOrg.AnchorPeers())
jcm.members2AnchorPeers[appOrg.MSPID()] = []api.AnchorPeer{}
for _, ap := range appOrg.AnchorPeers() {
anchorPeer := api.AnchorPeer{
Host: ap.Host,
Port: int(ap.Port),
}
jcm.members2AnchorPeers[appOrg.MSPID()] = append(jcm.members2AnchorPeers[appOrg.MSPID()], anchorPeer)
}
}
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", config.ChainID())
g.JoinChan(jcm, gossipCommon.ChainID(config.ChainID()))
}
// GetBlock returns block for given chain
func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block {
g.lock.RLock()
defer g.lock.RUnlock()
return g.chains[chainID].GetBlock(index)
}
// AddPayload appends message payload to for given chain
func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error {
g.lock.RLock()
defer g.lock.RUnlock()
return g.chains[chainID].AddPayload(payload)
}
// Stop stops the gossip component
func (g *gossipServiceImpl) Stop() {
g.lock.Lock()
defer g.lock.Unlock()
for _, ch := range g.chains {
logger.Info("Stopping chain", ch)
ch.Stop()
}
for chainID, electionService := range g.leaderElection {
logger.Info("Stopping leader election for %s", chainID)
electionService.Stop()
}
g.gossipSvc.Stop()
if g.deliveryService != nil {
g.deliveryService.Stop()
}
}
func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService {
PKIid := g.idMapper.GetPKIidOfCert(g.peerIdentity)
adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID))
return election.NewLeaderElectionService(adapter, string(PKIid), callback)
}
func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool {
for _, orgName := range orgListFromConfig(config) {
if orgName == myOrg {
return true
}
}
return false
}
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil {
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
}
} else {
if err := g.deliveryService.StopDeliverForChannel(chainID); err != nil {
logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err)
}
}
}
}
func orgListFromConfig(config Config) []string {
var orgList []string
for _, appOrg := range config.Organizations() {
orgList = append(orgList, appOrg.MSPID())
}
return orgList
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.0.0-beta

Search