6 Star 44 Fork 25

Hyperledger / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
gossip_service.go 11.73 KB
一键复制 编辑 原始数据 按行查看 历史
Gari Singh 提交于 2017-07-30 08:40 . [FAB-5497] Create 1.0.1 fabric release
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"sync"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/integration"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var (
gossipServiceInstance *gossipServiceImpl
once sync.Once
)
type gossipSvc gossip.Gossip
// GossipService encapsulates gossip and state capabilities into single interface
type GossipService interface {
gossip.Gossip
// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
NewConfigEventer() ConfigProcessor
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
InitializeChannel(chainID string, committer committer.Committer, endpoints []string)
// GetBlock returns block for given chain
GetBlock(chainID string, index uint64) *common.Block
// AddPayload appends message payload to for given chain
AddPayload(chainID string, payload *proto.Payload) error
}
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipService, endpoints []string, msc api.MessageCryptoService) (deliverclient.DeliverService, error)
}
type deliveryFactoryImpl struct {
}
// Returns an instance of delivery client
func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(&deliverclient.Config{
CryptoSvc: mcs,
Gossip: g,
Endpoints: endpoints,
ConnFactory: deliverclient.DefaultConnectionFactory,
ABCFactory: deliverclient.DefaultABCFactory,
})
}
type gossipServiceImpl struct {
gossipSvc
chains map[string]state.GossipStateProvider
leaderElection map[string]election.LeaderElectionService
deliveryService deliverclient.DeliverService
deliveryFactory DeliveryServiceFactory
lock sync.RWMutex
idMapper identity.Mapper
mcs api.MessageCryptoService
peerIdentity []byte
secAdv api.SecurityAdvisor
}
// This is an implementation of api.JoinChannelMessage.
type joinChannelMessage struct {
seqNum uint64
members2AnchorPeers map[string][]api.AnchorPeer
}
func (jcm *joinChannelMessage) SequenceNumber() uint64 {
return jcm.seqNum
}
// Members returns the organizations of the channel
func (jcm *joinChannelMessage) Members() []api.OrgIdentityType {
members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers))
i := 0
for org := range jcm.members2AnchorPeers {
members[i] = api.OrgIdentityType(org)
i++
}
return members
}
// AnchorPeersOf returns the anchor peers of the given organization
func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer {
return jcm.members2AnchorPeers[string(org)]
}
var logger = util.GetLogger(util.LoggingServiceModule, "")
// InitGossipService initialize gossip service
func InitGossipService(peerIdentity []byte, endpoint string, s *grpc.Server, mcs api.MessageCryptoService,
secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
// TODO: Remove this.
// TODO: This is a temporary work-around to make the gossip leader election module load its logger at startup
// TODO: in order for the flogging package to register this logger in time so it can set the log levels as requested in the config
util.GetLogger(util.LoggingElectionModule, "")
return InitGossipServiceCustomDeliveryFactory(peerIdentity, endpoint, s, &deliveryFactoryImpl{},
mcs, secAdv, secureDialOpts, bootPeers...)
}
// InitGossipServiceCustomDeliveryFactory initialize gossip service with customize delivery factory
// implementation, might be useful for testing and mocking purposes
func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string, s *grpc.Server,
factory DeliveryServiceFactory, mcs api.MessageCryptoService, secAdv api.SecurityAdvisor,
secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
var err error
var gossip gossip.Gossip
once.Do(func() {
if overrideEndpoint := viper.GetString("peer.gossip.endpoint"); overrideEndpoint != "" {
endpoint = overrideEndpoint
}
logger.Info("Initialize gossip with endpoint", endpoint, "and bootstrap set", bootPeers)
idMapper := identity.NewIdentityMapper(mcs, peerIdentity)
gossip, err = integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv,
mcs, idMapper, secureDialOpts, bootPeers...)
gossipServiceInstance = &gossipServiceImpl{
mcs: mcs,
gossipSvc: gossip,
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
deliveryFactory: factory,
idMapper: idMapper,
peerIdentity: peerIdentity,
secAdv: secAdv,
}
})
return err
}
// GetGossipService returns an instance of gossip service
func GetGossipService() GossipService {
return gossipServiceInstance
}
// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
func (g *gossipServiceImpl) NewConfigEventer() ConfigProcessor {
return newConfigEventer(g)
}
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", chainID)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
if g.deliveryService == nil {
var err error
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints, g.mcs)
if err != nil {
logger.Warning("Cannot create delivery client, due to", err)
}
}
// Delivery service might be nil only if it was not able to get connected
// to the ordering service
if g.deliveryService != nil {
// Parameters:
// - peer.gossip.useLeaderElection
// - peer.gossip.orgLeader
//
// are mutual exclusive, setting both to true is not defined, hence
// peer will panic and terminate
leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")
if leaderElection && isStaticOrgLeader {
logger.Panic("Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution")
}
if leaderElection {
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
} else if isStaticOrgLeader {
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
g.deliveryService.StartDeliverForChannel(chainID, committer, func() {})
} else {
logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
}
} else {
logger.Warning("Delivery client is down won't be able to pull blocks for chain", chainID)
}
}
// configUpdated constructs a joinChannelMessage and sends it to the gossipSvc
func (g *gossipServiceImpl) configUpdated(config Config) {
myOrg := string(g.secAdv.OrgByPeerIdentity(api.PeerIdentityType(g.peerIdentity)))
if !g.amIinChannel(myOrg, config) {
logger.Error("Tried joining channel", config.ChainID(), "but our org(", myOrg, "), isn't "+
"among the orgs of the channel:", orgListFromConfig(config), ", aborting.")
return
}
jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}}
for _, appOrg := range config.Organizations() {
logger.Debug(appOrg.MSPID(), "anchor peers:", appOrg.AnchorPeers())
jcm.members2AnchorPeers[appOrg.MSPID()] = []api.AnchorPeer{}
for _, ap := range appOrg.AnchorPeers() {
anchorPeer := api.AnchorPeer{
Host: ap.Host,
Port: int(ap.Port),
}
jcm.members2AnchorPeers[appOrg.MSPID()] = append(jcm.members2AnchorPeers[appOrg.MSPID()], anchorPeer)
}
}
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", config.ChainID())
g.JoinChan(jcm, gossipCommon.ChainID(config.ChainID()))
}
// GetBlock returns block for given chain
func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block {
g.lock.RLock()
defer g.lock.RUnlock()
return g.chains[chainID].GetBlock(index)
}
// AddPayload appends message payload to for given chain
func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error {
g.lock.RLock()
defer g.lock.RUnlock()
return g.chains[chainID].AddPayload(payload)
}
// Stop stops the gossip component
func (g *gossipServiceImpl) Stop() {
g.lock.Lock()
defer g.lock.Unlock()
for _, ch := range g.chains {
logger.Info("Stopping chain", ch)
ch.Stop()
}
for chainID, electionService := range g.leaderElection {
logger.Infof("Stopping leader election for %s", chainID)
electionService.Stop()
}
g.gossipSvc.Stop()
if g.deliveryService != nil {
g.deliveryService.Stop()
}
}
func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService {
PKIid := g.idMapper.GetPKIidOfCert(g.peerIdentity)
adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID))
return election.NewLeaderElectionService(adapter, string(PKIid), callback)
}
func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool {
for _, orgName := range orgListFromConfig(config) {
if orgName == myOrg {
return true
}
}
return false
}
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
yield := func() {
g.lock.RLock()
le := g.leaderElection[chainID]
g.lock.RUnlock()
le.Yield()
}
logger.Info("Elected as a leader, starting delivery service for channel", chainID)
if err := g.deliveryService.StartDeliverForChannel(chainID, committer, yield); err != nil {
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
}
} else {
logger.Info("Renounced leadership, stopping delivery service for channel", chainID)
if err := g.deliveryService.StopDeliverForChannel(chainID); err != nil {
logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err)
}
}
}
}
func orgListFromConfig(config Config) []string {
var orgList []string
for _, appOrg := range config.Organizations() {
orgList = append(orgList, appOrg.MSPID())
}
return orgList
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.0.6

搜索帮助

344bd9b3 5694891 D2dac590 5694891