6 Star 44 Fork 25

Hyperledger / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
peer.go 15.92 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package peer
import (
"errors"
"fmt"
"net"
"sync"
"github.com/hyperledger/fabric/common/config"
"github.com/hyperledger/fabric/common/configtx"
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/flogging"
mockconfigtx "github.com/hyperledger/fabric/common/mocks/configtx"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var peerLogger = flogging.MustGetLogger("peer")
var peerServer comm.GRPCServer
// singleton instance to manage CAs for the peer across channel config changes
var rootCASupport = comm.GetCASupport()
type chainSupport struct {
configtxapi.Manager
config.Application
ledger ledger.PeerLedger
}
func (cs *chainSupport) Ledger() ledger.PeerLedger {
return cs.ledger
}
func (cs *chainSupport) GetMSPIDs(cid string) []string {
return GetMSPIDs(cid)
}
// chain is a local struct to manage objects in a chain
type chain struct {
cs *chainSupport
cb *common.Block
committer committer.Committer
}
// chains is a local map of chainID->chainObject
var chains = struct {
sync.RWMutex
list map[string]*chain
}{list: make(map[string]*chain)}
//MockInitialize resets chains for test env
func MockInitialize() {
ledgermgmt.InitializeTestEnv()
chains.list = nil
chains.list = make(map[string]*chain)
chainInitializer = func(string) { return }
}
var chainInitializer func(string)
var mockMSPIDGetter func(string) []string
func MockSetMSPIDGetter(mspIDGetter func(string) []string) {
mockMSPIDGetter = mspIDGetter
}
// Initialize sets up any chains that the peer has from the persistence. This
// function should be called at the start up when the ledger and gossip
// ready
func Initialize(init func(string)) {
chainInitializer = init
var cb *common.Block
var ledger ledger.PeerLedger
ledgermgmt.Initialize()
ledgerIds, err := ledgermgmt.GetLedgerIDs()
if err != nil {
panic(fmt.Errorf("Error in initializing ledgermgmt: %s", err))
}
for _, cid := range ledgerIds {
peerLogger.Infof("Loading chain %s", cid)
if ledger, err = ledgermgmt.OpenLedger(cid); err != nil {
peerLogger.Warningf("Failed to load ledger %s(%s)", cid, err)
peerLogger.Debugf("Error while loading ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
continue
}
if cb, err = getCurrConfigBlockFromLedger(ledger); err != nil {
peerLogger.Warningf("Failed to find config block on ledger %s(%s)", cid, err)
peerLogger.Debugf("Error while looking for config block on ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
continue
}
// Create a chain if we get a valid ledger with config block
if err = createChain(cid, ledger, cb); err != nil {
peerLogger.Warningf("Failed to load chain %s(%s)", cid, err)
peerLogger.Debugf("Error reloading chain %s with message %s. We continue to the next chain rather than abort.", cid, err)
continue
}
InitChain(cid)
}
}
// Take care to initialize chain after peer joined, for example deploys system CCs
func InitChain(cid string) {
if chainInitializer != nil {
// Initialize chaincode, namely deploy system CC
peerLogger.Debugf("Init chain %s", cid)
chainInitializer(cid)
}
}
func getCurrConfigBlockFromLedger(ledger ledger.PeerLedger) (*common.Block, error) {
peerLogger.Debugf("Getting config block")
// get last block. Last block number is Height-1
blockchainInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return nil, err
}
lastBlock, err := ledger.GetBlockByNumber(blockchainInfo.Height - 1)
if err != nil {
return nil, err
}
// get most recent config block location from last block metadata
configBlockIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
return nil, err
}
// get most recent config block
configBlock, err := ledger.GetBlockByNumber(configBlockIndex)
if err != nil {
return nil, err
}
peerLogger.Debugf("Got config block[%d]", configBlockIndex)
return configBlock, nil
}
// createChain creates a new chain object and insert it into the chains
func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
if err != nil {
return err
}
configtxInitializer := configtx.NewInitializer()
gossipEventer := service.GetGossipService().NewConfigEventer()
gossipCallbackWrapper := func(cm configtxapi.Manager) {
ac, ok := configtxInitializer.ApplicationConfig()
if !ok {
// TODO, handle a missing ApplicationConfig more gracefully
ac = nil
}
gossipEventer.ProcessConfigUpdate(&chainSupport{
Manager: cm,
Application: ac,
})
service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
// TODO: this is a place-holder that would somehow make the MSP layer suspect
// that a given certificate is revoked, or its intermediate CA is revoked.
// In the meantime, before we have such an ability, we return true in order
// to suspect ALL identities in order to validate all of them.
return true
})
}
trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {
updateTrustedRoots(cm)
}
configtxManager, err := configtx.NewManagerImpl(
envelopeConfig,
configtxInitializer,
[]func(cm configtxapi.Manager){gossipCallbackWrapper, trustedRootsCallbackWrapper},
)
if err != nil {
return err
}
// TODO remove once all references to mspmgmt are gone from peer code
mspmgmt.XXXSetMSPManager(cid, configtxManager.MSPManager())
ac, ok := configtxInitializer.ApplicationConfig()
if !ok {
ac = nil
}
cs := &chainSupport{
Manager: configtxManager,
Application: ac, // TODO, refactor as this is accessible through Manager
ledger: ledger,
}
c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
}
return SetCurrConfigBlock(block, chainID)
})
ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()
if len(ordererAddresses) == 0 {
return errors.New("No ordering service endpoint provided in configuration block")
}
service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)
chains.Lock()
defer chains.Unlock()
chains.list[cid] = &chain{
cs: cs,
cb: cb,
committer: c,
}
return nil
}
// CreateChainFromBlock creates a new chain from config block
func CreateChainFromBlock(cb *common.Block) error {
cid, err := utils.GetChainIDFromBlock(cb)
if err != nil {
return err
}
var l ledger.PeerLedger
if l, err = ledgermgmt.CreateLedger(cb); err != nil {
return fmt.Errorf("Cannot create ledger from genesis block, due to %s", err)
}
return createChain(cid, l, cb)
}
// MockCreateChain used for creating a ledger for a chain for tests
// without havin to join
func MockCreateChain(cid string) error {
var ledger ledger.PeerLedger
var err error
if ledger = GetLedger(cid); ledger == nil {
gb, _ := configtxtest.MakeGenesisBlock(cid)
if ledger, err = ledgermgmt.CreateLedger(gb); err != nil {
return err
}
}
// Here we need to mock also the policy manager
// in order for the ACL to be checked
initializer := mockconfigtx.Initializer{
Resources: mockconfigtx.Resources{
PolicyManagerVal: &mockpolicies.Manager{
Policy: &mockpolicies.Policy{},
},
},
PolicyProposerVal: &mockconfigtx.PolicyProposer{
Transactional: mockconfigtx.Transactional{},
},
ValueProposerVal: &mockconfigtx.ValueProposer{
Transactional: mockconfigtx.Transactional{},
},
}
manager := &mockconfigtx.Manager{
Initializer: initializer,
}
chains.Lock()
defer chains.Unlock()
chains.list[cid] = &chain{
cs: &chainSupport{
Manager: manager,
ledger: ledger},
}
return nil
}
// GetLedger returns the ledger of the chain with chain ID. Note that this
// call returns nil if chain cid has not been created.
func GetLedger(cid string) ledger.PeerLedger {
chains.RLock()
defer chains.RUnlock()
if c, ok := chains.list[cid]; ok {
return c.cs.ledger
}
return nil
}
// GetPolicyManager returns the policy manager of the chain with chain ID. Note that this
// call returns nil if chain cid has not been created.
func GetPolicyManager(cid string) policies.Manager {
chains.RLock()
defer chains.RUnlock()
if c, ok := chains.list[cid]; ok {
return c.cs.PolicyManager()
}
return nil
}
// GetCurrConfigBlock returns the cached config block of the specified chain.
// Note that this call returns nil if chain cid has not been created.
func GetCurrConfigBlock(cid string) *common.Block {
chains.RLock()
defer chains.RUnlock()
if c, ok := chains.list[cid]; ok {
return c.cb
}
return nil
}
// updates the trusted roots for the peer based on updates to channels
func updateTrustedRoots(cm configtxapi.Manager) {
// this is triggered on per channel basis so first update the roots for the channel
peerLogger.Debugf("Updating trusted root authorities for channel %s", cm.ChainID())
var secureConfig comm.SecureServerConfig
var err error
// only run is TLS is enabled
secureConfig, err = GetSecureConfig()
if err == nil && secureConfig.UseTLS {
buildTrustedRootsForChain(cm)
// now iterate over all roots for all app and orderer chains
trustedRoots := [][]byte{}
rootCASupport.RLock()
defer rootCASupport.RUnlock()
for _, roots := range rootCASupport.AppRootCAsByChain {
trustedRoots = append(trustedRoots, roots...)
}
// also need to append statically configured root certs
if len(secureConfig.ClientRootCAs) > 0 {
trustedRoots = append(trustedRoots, secureConfig.ClientRootCAs...)
}
if len(secureConfig.ServerRootCAs) > 0 {
trustedRoots = append(trustedRoots, secureConfig.ServerRootCAs...)
}
server := GetPeerServer()
// now update the client roots for the peerServer
if server != nil {
err := server.SetClientRootCAs(trustedRoots)
if err != nil {
msg := "Failed to update trusted roots for peer from latest config " +
"block. This peer may not be able to communicate " +
"with members of channel %s (%s)"
peerLogger.Warningf(msg, cm.ChainID(), err)
}
}
}
}
// populates the appRootCAs and orderRootCAs maps by getting the
// root and intermediate certs for all msps associated with the MSPManager
func buildTrustedRootsForChain(cm configtxapi.Manager) {
rootCASupport.Lock()
defer rootCASupport.Unlock()
appRootCAs := [][]byte{}
ordererRootCAs := [][]byte{}
appOrgMSPs := make(map[string]struct{})
ac, ok := cm.ApplicationConfig()
if ok {
//loop through app orgs and build map of MSPIDs
for _, appOrg := range ac.Organizations() {
appOrgMSPs[appOrg.MSPID()] = struct{}{}
}
}
cid := cm.ChainID()
peerLogger.Debugf("updating root CAs for channel [%s]", cid)
msps, err := cm.MSPManager().GetMSPs()
if err != nil {
peerLogger.Errorf("Error getting root CAs for channel %s (%s)", cid, err)
}
if err == nil {
for k, v := range msps {
// check to see if this is a FABRIC MSP
if v.GetType() == msp.FABRIC {
for _, root := range v.GetTLSRootCerts() {
// check to see of this is an app org MSP
if _, ok := appOrgMSPs[k]; ok {
peerLogger.Debugf("adding app root CAs for MSP [%s]", k)
appRootCAs = append(appRootCAs, root)
} else {
peerLogger.Debugf("adding orderer root CAs for MSP [%s]", k)
ordererRootCAs = append(ordererRootCAs, root)
}
}
for _, intermediate := range v.GetTLSIntermediateCerts() {
// check to see of this is an app org MSP
if _, ok := appOrgMSPs[k]; ok {
peerLogger.Debugf("adding app root CAs for MSP [%s]", k)
appRootCAs = append(appRootCAs, intermediate)
} else {
peerLogger.Debugf("adding orderer root CAs for MSP [%s]", k)
ordererRootCAs = append(ordererRootCAs, intermediate)
}
}
}
}
rootCASupport.AppRootCAsByChain[cid] = appRootCAs
rootCASupport.OrdererRootCAsByChain[cid] = ordererRootCAs
}
}
// GetMSPIDs returns the ID of each application MSP defined on this chain
func GetMSPIDs(cid string) []string {
chains.RLock()
defer chains.RUnlock()
//if mock is set, use it to return MSPIDs
//used for tests without a proper join
if mockMSPIDGetter != nil {
return mockMSPIDGetter(cid)
}
if c, ok := chains.list[cid]; ok {
if c == nil || c.cs == nil {
return nil
}
ac, ok := c.cs.ApplicationConfig()
if !ok || ac.Organizations() == nil {
return nil
}
orgs := ac.Organizations()
toret := make([]string, len(orgs))
i := 0
for _, org := range orgs {
toret[i] = org.MSPID()
i++
}
return toret
}
return nil
}
// SetCurrConfigBlock sets the current config block of the specified chain
func SetCurrConfigBlock(block *common.Block, cid string) error {
chains.Lock()
defer chains.Unlock()
if c, ok := chains.list[cid]; ok {
c.cb = block
return nil
}
return fmt.Errorf("Chain %s doesn't exist on the peer", cid)
}
// NewPeerClientConnection Returns a new grpc.ClientConn to the configured local PEER.
func NewPeerClientConnection() (*grpc.ClientConn, error) {
return NewPeerClientConnectionWithAddress(viper.GetString("peer.address"))
}
// GetLocalIP returns the non loopback local IP of the host
func GetLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, address := range addrs {
// check the address type and if it is not a loopback then display it
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
// NewPeerClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
func NewPeerClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn, error) {
if comm.TLSEnabled() {
return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForPeer())
}
return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil)
}
// GetChannelsInfo returns an array with information about all channels for
// this peer
func GetChannelsInfo() []*pb.ChannelInfo {
// array to store metadata for all channels
var channelInfoArray []*pb.ChannelInfo
chains.RLock()
defer chains.RUnlock()
for key := range chains.list {
channelInfo := &pb.ChannelInfo{ChannelId: key}
// add this specific chaincode's metadata to the array of all chaincodes
channelInfoArray = append(channelInfoArray, channelInfo)
}
return channelInfoArray
}
// NewChannelPolicyManagerGetter returns a new instance of ChannelPolicyManagerGetter
func NewChannelPolicyManagerGetter() policies.ChannelPolicyManagerGetter {
return &channelPolicyManagerGetter{}
}
type channelPolicyManagerGetter struct{}
func (c *channelPolicyManagerGetter) Manager(channelID string) (policies.Manager, bool) {
policyManager := GetPolicyManager(channelID)
return policyManager, policyManager != nil
}
// CreatePeerServer creates an instance of comm.GRPCServer
// This server is used for peer communications
func CreatePeerServer(listenAddress string,
secureConfig comm.SecureServerConfig) (comm.GRPCServer, error) {
var err error
peerServer, err = comm.NewGRPCServer(listenAddress, secureConfig)
if err != nil {
peerLogger.Errorf("Failed to create peer server (%s)", err)
return nil, err
}
return peerServer, nil
}
// GetPeerServer returns the peer server instance
func GetPeerServer() comm.GRPCServer {
return peerServer
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.0.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891