1 Star 0 Fork 0

peter/fabric

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
peer.go 16.71 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peer
import (
"errors"
"fmt"
"net"
"sync"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/config"
"github.com/hyperledger/fabric/common/configtx"
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/flogging"
mockconfigtx "github.com/hyperledger/fabric/common/mocks/configtx"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/protos/common"
mspprotos "github.com/hyperledger/fabric/protos/msp"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var peerLogger = flogging.MustGetLogger("peer")
var peerServer comm.GRPCServer
// singleton instance to manage CAs for the peer across channel config changes
var rootCASupport = comm.GetCASupport()
type chainSupport struct {
configtxapi.Manager
config.Application
ledger ledger.PeerLedger
}
func (cs *chainSupport) Ledger() ledger.PeerLedger {
return cs.ledger
}
func (cs *chainSupport) GetMSPIDs(cid string) []string {
return GetMSPIDs(cid)
}
// chain is a local struct to manage objects in a chain
type chain struct {
cs *chainSupport
cb *common.Block
committer committer.Committer
}
// chains is a local map of chainID->chainObject
var chains = struct {
sync.RWMutex
list map[string]*chain
}{list: make(map[string]*chain)}
//MockInitialize resets chains for test env
func MockInitialize() {
ledgermgmt.InitializeTestEnv()
chains.list = nil
chains.list = make(map[string]*chain)
chainInitializer = func(string) { return }
}
var chainInitializer func(string)
var mockMSPIDGetter func(string) []string
func MockSetMSPIDGetter(mspIDGetter func(string) []string) {
mockMSPIDGetter = mspIDGetter
}
// Initialize sets up any chains that the peer has from the persistence. This
// function should be called at the start up when the ledger and gossip
// ready
func Initialize(init func(string)) {
chainInitializer = init
var cb *common.Block
var ledger ledger.PeerLedger
ledgermgmt.Initialize()
ledgerIds, err := ledgermgmt.GetLedgerIDs()
if err != nil {
panic(fmt.Errorf("Error in initializing ledgermgmt: %s", err))
}
for _, cid := range ledgerIds {
peerLogger.Infof("Loading chain %s", cid)
if ledger, err = ledgermgmt.OpenLedger(cid); err != nil {
peerLogger.Warningf("Failed to load ledger %s(%s)", cid, err)
peerLogger.Debugf("Error while loading ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
continue
}
if cb, err = getCurrConfigBlockFromLedger(ledger); err != nil {
peerLogger.Warningf("Failed to find config block on ledger %s(%s)", cid, err)
peerLogger.Debugf("Error while looking for config block on ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
continue
}
// Create a chain if we get a valid ledger with config block
if err = createChain(cid, ledger, cb); err != nil {
peerLogger.Warningf("Failed to load chain %s(%s)", cid, err)
peerLogger.Debugf("Error reloading chain %s with message %s. We continue to the next chain rather than abort.", cid, err)
continue
}
InitChain(cid)
}
}
// Take care to initialize chain after peer joined, for example deploys system CCs
func InitChain(cid string) {
if chainInitializer != nil {
// Initialize chaincode, namely deploy system CC
peerLogger.Debugf("Init chain %s", cid)
chainInitializer(cid)
}
}
func getCurrConfigBlockFromLedger(ledger ledger.PeerLedger) (*common.Block, error) {
peerLogger.Debugf("Getting config block")
// get last block. Last block number is Height-1
blockchainInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return nil, err
}
lastBlock, err := ledger.GetBlockByNumber(blockchainInfo.Height - 1)
if err != nil {
return nil, err
}
// get most recent config block location from last block metadata
configBlockIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
return nil, err
}
// get most recent config block
configBlock, err := ledger.GetBlockByNumber(configBlockIndex)
if err != nil {
return nil, err
}
peerLogger.Debugf("Got config block[%d]", configBlockIndex)
return configBlock, nil
}
// createChain creates a new chain object and insert it into the chains
func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
if err != nil {
return err
}
configtxInitializer := configtx.NewInitializer()
gossipEventer := service.GetGossipService().NewConfigEventer()
gossipCallbackWrapper := func(cm configtxapi.Manager) {
gossipEventer.ProcessConfigUpdate(&chainSupport{
Manager: cm,
Application: configtxInitializer.ApplicationConfig(),
})
service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
// TODO: this is a place-holder that would somehow make the MSP layer suspect
// that a given certificate is revoked, or its intermediate CA is revoked.
// In the meantime, before we have such an ability, we return true in order
// to suspect ALL identities in order to validate all of them.
return true
})
}
trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {
updateTrustedRoots(cm)
}
configtxManager, err := configtx.NewManagerImpl(
envelopeConfig,
configtxInitializer,
[]func(cm configtxapi.Manager){gossipCallbackWrapper, trustedRootsCallbackWrapper},
)
if err != nil {
return err
}
// TODO remove once all references to mspmgmt are gone from peer code
mspmgmt.XXXSetMSPManager(cid, configtxManager.MSPManager())
cs := &chainSupport{
Manager: configtxManager,
Application: configtxManager.ApplicationConfig(), // TODO, refactor as this is accessible through Manager
ledger: ledger,
}
c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
}
return SetCurrConfigBlock(block, chainID)
})
ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()
if len(ordererAddresses) == 0 {
return errors.New("No ordering service endpoint provided in configuration block")
}
service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)
chains.Lock()
defer chains.Unlock()
chains.list[cid] = &chain{
cs: cs,
cb: cb,
committer: c,
}
return nil
}
// CreateChainFromBlock creates a new chain from config block
func CreateChainFromBlock(cb *common.Block) error {
cid, err := utils.GetChainIDFromBlock(cb)
if err != nil {
return err
}
var l ledger.PeerLedger
if l, err = ledgermgmt.CreateLedger(cb); err != nil {
return fmt.Errorf("Cannot create ledger from genesis block, due to %s", err)
}
return createChain(cid, l, cb)
}
// MockCreateChain used for creating a ledger for a chain for tests
// without havin to join
func MockCreateChain(cid string) error {
var ledger ledger.PeerLedger
var err error
if ledger = GetLedger(cid); ledger == nil {
gb, _ := configtxtest.MakeGenesisBlock(cid)
if ledger, err = ledgermgmt.CreateLedger(gb); err != nil {
return err
}
}
// Here we need to mock also the policy manager
// in order for the ACL to be checked
initializer := mockconfigtx.Initializer{
Resources: mockconfigtx.Resources{
PolicyManagerVal: &mockpolicies.Manager{
Policy: &mockpolicies.Policy{},
},
},
PolicyProposerVal: &mockconfigtx.PolicyProposer{
Transactional: mockconfigtx.Transactional{},
},
ValueProposerVal: &mockconfigtx.ValueProposer{
Transactional: mockconfigtx.Transactional{},
},
}
manager := &mockconfigtx.Manager{
Initializer: initializer,
}
chains.Lock()
defer chains.Unlock()
chains.list[cid] = &chain{
cs: &chainSupport{
Manager: manager,
ledger: ledger},
}
return nil
}
// GetLedger returns the ledger of the chain with chain ID. Note that this
// call returns nil if chain cid has not been created.
func GetLedger(cid string) ledger.PeerLedger {
chains.RLock()
defer chains.RUnlock()
if c, ok := chains.list[cid]; ok {
return c.cs.ledger
}
return nil
}
// GetPolicyManager returns the policy manager of the chain with chain ID. Note that this
// call returns nil if chain cid has not been created.
func GetPolicyManager(cid string) policies.Manager {
chains.RLock()
defer chains.RUnlock()
if c, ok := chains.list[cid]; ok {
return c.cs.PolicyManager()
}
return nil
}
// GetCurrConfigBlock returns the cached config block of the specified chain.
// Note that this call returns nil if chain cid has not been created.
func GetCurrConfigBlock(cid string) *common.Block {
chains.RLock()
defer chains.RUnlock()
if c, ok := chains.list[cid]; ok {
return c.cb
}
return nil
}
// updates the trusted roots for the peer based on updates to channels
func updateTrustedRoots(cm configtxapi.Manager) {
// this is triggered on per channel basis so first update the roots for the channel
peerLogger.Debugf("Updating trusted root authorities for channel %s", cm.ChainID())
var secureConfig comm.SecureServerConfig
var err error
// only run is TLS is enabled
secureConfig, err = GetSecureConfig()
if err == nil && secureConfig.UseTLS {
buildTrustedRootsForChain(cm)
// now iterate over all roots for all app and orderer chains
trustedRoots := [][]byte{}
rootCASupport.RLock()
defer rootCASupport.RUnlock()
for _, roots := range rootCASupport.AppRootCAsByChain {
trustedRoots = append(trustedRoots, roots...)
}
// also need to append statically configured root certs
if len(secureConfig.ClientRootCAs) > 0 {
trustedRoots = append(trustedRoots, secureConfig.ClientRootCAs...)
}
if len(secureConfig.ServerRootCAs) > 0 {
trustedRoots = append(trustedRoots, secureConfig.ServerRootCAs...)
}
server := GetPeerServer()
// now update the client roots for the peerServer
if server != nil {
err := server.SetClientRootCAs(trustedRoots)
if err != nil {
msg := "Failed to update trusted roots for peer from latest config " +
"block. This peer may not be able to communicate " +
"with members of channel %s (%s)"
peerLogger.Warningf(msg, cm.ChainID(), err)
}
}
}
}
// populates the appRootCAs and orderRootCAs maps by getting the
// root and intermediate certs for all msps associated with the MSPManager
func buildTrustedRootsForChain(cm configtxapi.Manager) {
rootCASupport.Lock()
defer rootCASupport.Unlock()
appRootCAs := [][]byte{}
ordererRootCAs := [][]byte{}
appOrgMSPs := make(map[string]struct{})
//loop through app orgs and build map of MSPIDs
for _, appOrg := range cm.ApplicationConfig().Organizations() {
appOrgMSPs[appOrg.MSPID()] = struct{}{}
}
cid := cm.ChainID()
peerLogger.Debugf("updating root CAs for channel [%s]", cid)
msps, err := cm.MSPManager().GetMSPs()
if err != nil {
peerLogger.Errorf("Error getting root CAs for channel %s (%s)", cid, err)
}
if err == nil {
for k, v := range msps {
// check to see if this is a FABRIC MSP
if v.GetType() == msp.FABRIC {
for _, root := range v.GetRootCerts() {
sid, err := root.Serialize()
if err == nil {
id := &mspprotos.SerializedIdentity{}
err = proto.Unmarshal(sid, id)
if err == nil {
// check to see of this is an app org MSP
if _, ok := appOrgMSPs[k]; ok {
peerLogger.Debugf("adding app root CAs for MSP [%s]", k)
appRootCAs = append(appRootCAs, id.IdBytes)
} else {
peerLogger.Debugf("adding orderer root CAs for MSP [%s]", k)
ordererRootCAs = append(ordererRootCAs, id.IdBytes)
}
}
}
}
for _, intermediate := range v.GetIntermediateCerts() {
sid, err := intermediate.Serialize()
if err == nil {
id := &mspprotos.SerializedIdentity{}
err = proto.Unmarshal(sid, id)
if err == nil {
// check to see of this is an app org MSP
if _, ok := appOrgMSPs[k]; ok {
peerLogger.Debugf("adding app root CAs for MSP [%s]", k)
appRootCAs = append(appRootCAs, id.IdBytes)
} else {
peerLogger.Debugf("adding orderer root CAs for MSP [%s]", k)
ordererRootCAs = append(ordererRootCAs, id.IdBytes)
}
}
}
}
}
}
rootCASupport.AppRootCAsByChain[cid] = appRootCAs
rootCASupport.OrdererRootCAsByChain[cid] = ordererRootCAs
}
}
// GetMSPIDs returns the ID of each application MSP defined on this chain
func GetMSPIDs(cid string) []string {
chains.RLock()
defer chains.RUnlock()
//if mock is set, use it to return MSPIDs
//used for tests without a proper join
if mockMSPIDGetter != nil {
return mockMSPIDGetter(cid)
}
if c, ok := chains.list[cid]; ok {
if c == nil || c.cs == nil ||
c.cs.ApplicationConfig() == nil ||
c.cs.ApplicationConfig().Organizations() == nil {
return nil
}
orgs := c.cs.ApplicationConfig().Organizations()
toret := make([]string, len(orgs))
i := 0
for _, org := range orgs {
toret[i] = org.MSPID()
i++
}
return toret
}
return nil
}
// SetCurrConfigBlock sets the current config block of the specified chain
func SetCurrConfigBlock(block *common.Block, cid string) error {
chains.Lock()
defer chains.Unlock()
if c, ok := chains.list[cid]; ok {
c.cb = block
return nil
}
return fmt.Errorf("Chain %s doesn't exist on the peer", cid)
}
// NewPeerClientConnection Returns a new grpc.ClientConn to the configured local PEER.
func NewPeerClientConnection() (*grpc.ClientConn, error) {
return NewPeerClientConnectionWithAddress(viper.GetString("peer.address"))
}
// GetLocalIP returns the non loopback local IP of the host
func GetLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, address := range addrs {
// check the address type and if it is not a loopback then display it
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
// NewPeerClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
func NewPeerClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn, error) {
if comm.TLSEnabled() {
return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForPeer())
}
return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil)
}
// GetChannelsInfo returns an array with information about all channels for
// this peer
func GetChannelsInfo() []*pb.ChannelInfo {
// array to store metadata for all channels
var channelInfoArray []*pb.ChannelInfo
chains.RLock()
defer chains.RUnlock()
for key := range chains.list {
channelInfo := &pb.ChannelInfo{ChannelId: key}
// add this specific chaincode's metadata to the array of all chaincodes
channelInfoArray = append(channelInfoArray, channelInfo)
}
return channelInfoArray
}
// NewChannelPolicyManagerGetter returns a new instance of ChannelPolicyManagerGetter
func NewChannelPolicyManagerGetter() policies.ChannelPolicyManagerGetter {
return &channelPolicyManagerGetter{}
}
type channelPolicyManagerGetter struct{}
func (c *channelPolicyManagerGetter) Manager(channelID string) (policies.Manager, bool) {
policyManager := GetPolicyManager(channelID)
return policyManager, policyManager != nil
}
// CreatePeerServer creates an instance of comm.GRPCServer
// This server is used for peer communications
func CreatePeerServer(listenAddress string,
secureConfig comm.SecureServerConfig) (comm.GRPCServer, error) {
var err error
peerServer, err = comm.NewGRPCServer(listenAddress, secureConfig)
if err != nil {
peerLogger.Errorf("Failed to create peer server (%s)", err)
return nil, err
}
return peerServer, nil
}
// GetPeerServer returns the peer server instance
func GetPeerServer() comm.GRPCServer {
return peerServer
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/peter_code_git/fabric.git
git@gitee.com:peter_code_git/fabric.git
peter_code_git
fabric
fabric
v1.0.0-beta

搜索帮助