1 Star 0 Fork 0

peter/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
start.go 19.95 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package node
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/viperutil"
"github.com/hyperledger/fabric/core"
"github.com/hyperledger/fabric/core/aclmgmt"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/chaincode/accesscontrol"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/core/endorser"
authHandler "github.com/hyperledger/fabric/core/handlers/auth"
"github.com/hyperledger/fabric/core/handlers/library"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/core/scc"
"github.com/hyperledger/fabric/events/producer"
common2 "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/peer/common"
peergossip "github.com/hyperledger/fabric/peer/gossip"
"github.com/hyperledger/fabric/peer/version"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
const (
chaincodeAddrKey = "peer.chaincodeAddress"
chaincodeListenAddrKey = "peer.chaincodeListenAddress"
defaultChaincodePort = 7052
)
var chaincodeDevMode bool
var orderingEndpoint string
// XXXDefaultChannelMSPID should not be defined in production code
// It should only be referenced in tests. However, it is necessary
// to support the 'default chain' setup so temporarily adding until
// this concept can be removed to testing scenarios only
const XXXDefaultChannelMSPID = "DEFAULT"
func startCmd() *cobra.Command {
// Set the flags on the node start command.
flags := nodeStartCmd.Flags()
flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false,
"Whether peer in chaincode development mode")
flags.StringVarP(&orderingEndpoint, "orderer", "o", "orderer:7050", "Ordering service endpoint")
return nodeStartCmd
}
var nodeStartCmd = &cobra.Command{
Use: "start",
Short: "Starts the node.",
Long: `Starts a node that interacts with the network.`,
RunE: func(cmd *cobra.Command, args []string) error {
return serve(args)
},
}
//start chaincodes
func initSysCCs() {
//deploy system chaincodes
scc.DeploySysCCs("")
logger.Infof("Deployed system chaincodes")
}
func serve(args []string) error {
// currently the peer only works with the standard MSP
// because in certain scenarios the MSP has to make sure
// that from a single credential you only have a single 'identity'.
// Idemix does not support this *YET* but it can be easily
// fixed to support it. For now, we just make sure that
// the peer only comes up with the standard MSP
mspType := mgmt.GetLocalMSP().GetType()
if mspType != msp.FABRIC {
panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
}
logger.Infof("Starting %s", version.GetInfo())
//startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).
//Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)
aclmgmt.RegisterACLProvider(nil)
//initialize resource management exit
ledgermgmt.Initialize(peer.ConfigTxProcessors)
// Parameter overrides must be processed before any parameters are
// cached. Failures to cache cause the server to terminate immediately.
if chaincodeDevMode {
logger.Info("Running in chaincode development mode")
logger.Info("Disable loading validity system chaincode")
viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
}
if err := peer.CacheConfiguration(); err != nil {
return err
}
peerEndpoint, err := peer.GetPeerEndpoint()
if err != nil {
err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
return err
}
var peerHost string
peerHost, _, err = net.SplitHostPort(peerEndpoint.Address)
if err != nil {
return fmt.Errorf("peer address is not in the format of host:port: %v", err)
}
listenAddr := viper.GetString("peer.listenAddress")
serverConfig, err := peer.GetServerConfig()
if err != nil {
logger.Fatalf("Error loading secure config for peer (%s)", err)
}
peerServer, err := peer.CreatePeerServer(listenAddr, serverConfig)
if err != nil {
logger.Fatalf("Failed to create peer server (%s)", err)
}
if serverConfig.SecOpts.UseTLS {
logger.Info("Starting peer with TLS enabled")
// set up credential support
cs := comm.GetCredentialSupport()
cs.ServerRootCAs = serverConfig.SecOpts.ServerRootCAs
// set the cert to use if client auth is requested by remote endpoints
clientCert, err := peer.GetClientCertificate()
if err != nil {
logger.Fatalf("Failed to set TLS client certficate (%s)", err)
}
comm.GetCredentialSupport().SetClientCertificate(clientCert)
}
//TODO - do we need different SSL material for events ?
ehubGrpcServer, err := createEventHubServer(serverConfig)
if err != nil {
grpclog.Fatalf("Failed to create ehub server: %v", err)
}
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
policyCheckerProvider := func(resourceName string) deliver.PolicyChecker {
return func(env *cb.Envelope, channelID string) error {
return aclmgmt.GetACLProvider().CheckACL(resourceName, channelID, env)
}
}
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverSupportManager{})
pb.RegisterDeliverServer(peerServer.Server(), abServer)
// enable the cache of chaincode info
ccprovider.EnableCCInfoCache()
// Create a self-signed CA for chaincode service
ca, err := accesscontrol.NewCA()
if err != nil {
logger.Panic("Failed creating authentication layer:", err)
}
ccSrv, ccEndpoint, err := createChaincodeServer(ca, peerHost)
if err != nil {
logger.Panicf("Failed to create chaincode server: %s", err)
}
registerChaincodeSupport(ccSrv, ccEndpoint, ca)
go ccSrv.Start()
logger.Debugf("Running peer")
// Register the Admin server
pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer())
privDataDist := func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
return service.GetGossipService().DistributePrivateData(channel, txID, privateData)
}
serverEndorser := endorser.NewEndorserServer(privDataDist, &endorser.SupportImpl{})
libConf := library.Config{}
if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {
return errors.WithMessage(err, "could not load YAML config")
}
authFilters := library.InitRegistry(libConf).Lookup(library.Auth).([]authHandler.Filter)
auth := authHandler.ChainFilters(serverEndorser, authFilters...)
// Register the Endorser server
pb.RegisterEndorserServer(peerServer.Server(), auth)
// Initialize gossip component
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")
serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
if err != nil {
logger.Panicf("Failed serializing self identity: %v", err)
}
messageCryptoService := peergossip.NewMCS(
peer.NewChannelPolicyManagerGetter(),
localmsp.NewSigner(),
mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
// callback function for secure dial options for gossip service
secureDialOpts := func() []grpc.DialOption {
var dialOpts []grpc.DialOption
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
// set the keepalive options
kaOpts := comm.DefaultKeepaliveOptions()
if viper.IsSet("peer.keepalive.client.interval") {
kaOpts.ClientInterval = viper.GetDuration("peer.keepalive.client.interval")
}
if viper.IsSet("peer.keepalive.client.timeout") {
kaOpts.ClientTimeout = viper.GetDuration("peer.keepalive.client.timeout")
}
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(kaOpts)...)
if comm.TLSEnabled() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCredentialSupport().GetPeerCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
return dialOpts
}
var certs *common2.TLSCertificates
if peerServer.TLSEnabled() {
serverCert := peerServer.ServerCertificate()
clientCert, err := peer.GetClientCertificate()
if err != nil {
return errors.Wrap(err, "failed obtaining client certificates")
}
certs = &common2.TLSCertificates{}
certs.TLSServerCert.Store(&serverCert)
certs.TLSClientCert.Store(&clientCert)
}
err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(), certs,
messageCryptoService, secAdv, secureDialOpts, bootstrap...)
if err != nil {
return err
}
defer service.GetGossipService().Stop()
//initialize system chaincodes
initSysCCs()
//this brings up all the chains (including testchainid)
peer.Initialize(func(cid string) {
logger.Debugf("Deploying system CC, for chain <%s>", cid)
scc.DeploySysCCs(cid)
})
logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]",
peerEndpoint.Id, viper.GetString("peer.networkId"), peerEndpoint.Address)
// Start the grpc server. Done in a goroutine so we can deploy the
// genesis block if needed.
serve := make(chan error)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logger.Debugf("sig: %s", sig)
serve <- nil
}()
go func() {
var grpcErr error
if grpcErr = peerServer.Start(); grpcErr != nil {
grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
} else {
logger.Info("peer server exited")
}
serve <- grpcErr
}()
if err := writePid(config.GetPath("peer.fileSystemPath")+"/peer.pid", os.Getpid()); err != nil {
return err
}
// Start the event hub server
if ehubGrpcServer != nil {
go ehubGrpcServer.Start()
}
// Start profiling http endpoint if enabled
if viper.GetBool("peer.profile.enabled") {
go func() {
profileListenAddress := viper.GetString("peer.profile.listenAddress")
logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
logger.Errorf("Error starting profiler: %s", profileErr)
}
}()
}
logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]",
peerEndpoint.Id, viper.GetString("peer.networkId"), peerEndpoint.Address)
// set the logging level for specific modules defined via environment
// variables or core.yaml
overrideLogModules := []string{"msp", "gossip", "ledger", "cauthdsl", "policies", "grpc", "peer.gossip"}
for _, module := range overrideLogModules {
err = common.SetLogLevelFromViper(module)
if err != nil {
logger.Warningf("Error setting log level for module '%s': %s", module, err.Error())
}
}
flogging.SetPeerStartupModulesMap()
// Block until grpc server exits
return <-serve
}
//create a CC listener using peer.chaincodeListenAddress (and if that's not set use peer.peerAddress)
func createChaincodeServer(ca accesscontrol.CA, peerHostname string) (srv comm.GRPCServer, ccEndpoint string, err error) {
// before potentially setting chaincodeListenAddress, compute chaincode endpoint at first
ccEndpoint, err = computeChaincodeEndpoint(peerHostname)
if err != nil {
if chaincode.IsDevMode() {
// if any error for dev mode, we use 0.0.0.0:7052
ccEndpoint = fmt.Sprintf("%s:%d", "0.0.0.0", defaultChaincodePort)
logger.Warningf("use %s as chaincode endpoint because of error in computeChaincodeEndpoint: %s", ccEndpoint, err)
} else {
// for non-dev mode, we have to return error
logger.Errorf("Error computing chaincode endpoint: %s", err)
return nil, "", err
}
}
host, _, err := net.SplitHostPort(ccEndpoint)
if err != nil {
logger.Panic("Chaincode service host", ccEndpoint, "isn't a valid hostname:", err)
}
cclistenAddress := viper.GetString(chaincodeListenAddrKey)
if cclistenAddress == "" {
cclistenAddress = fmt.Sprintf("%s:%d", peerHostname, defaultChaincodePort)
logger.Warningf("%s is not set, using %s", chaincodeListenAddrKey, cclistenAddress)
viper.Set(chaincodeListenAddrKey, cclistenAddress)
}
config, err := peer.GetServerConfig()
if err != nil {
logger.Errorf("Error getting server config: %s", err)
return nil, "", err
}
// Override TLS configuration if TLS is applicable
if config.SecOpts.UseTLS {
// Create a self-signed TLS certificate with a SAN that matches the computed chaincode endpoint
certKeyPair, err := ca.NewServerCertKeyPair(host)
if err != nil {
logger.Panicf("Failed generating TLS certificate for chaincode service: +%v", err)
}
config.SecOpts = &comm.SecureOptions{
UseTLS: true,
// Require chaincode shim to authenticate itself
RequireClientCert: true,
// Trust only client certificates signed by ourselves
ClientRootCAs: [][]byte{ca.CertBytes()},
// Use our own self-signed TLS certificate and key
Certificate: certKeyPair.Cert,
Key: certKeyPair.Key,
// No point in specifying server root CAs since this TLS config is only used for
// a gRPC server and not a client
ServerRootCAs: nil,
}
}
// Chaincode keepalive options - static for now
chaincodeKeepaliveOptions := &comm.KeepaliveOptions{
ServerInterval: time.Duration(2) * time.Hour, // 2 hours - gRPC default
ServerTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default
ServerMinInterval: time.Duration(1) * time.Minute, // match ClientInterval
}
config.KaOpts = chaincodeKeepaliveOptions
srv, err = comm.NewGRPCServer(cclistenAddress, config)
if err != nil {
logger.Errorf("Error creating GRPC server: %s", err)
return nil, "", err
}
return srv, ccEndpoint, nil
}
// computeChaincodeEndpoint will utilize chaincode address, chaincode listen
// address (these two are from viper) and peer address to compute chaincode endpoint.
// There could be following cases of computing chaincode endpoint:
// Case A: if chaincodeAddrKey is set, use it if not "0.0.0.0" (or "::")
// Case B: else if chaincodeListenAddrKey is set and not "0.0.0.0" or ("::"), use it
// Case C: else use peer address if not "0.0.0.0" (or "::")
// Case D: else return error
func computeChaincodeEndpoint(peerHostname string) (ccEndpoint string, err error) {
logger.Infof("Entering computeChaincodeEndpoint with peerHostname: %s", peerHostname)
// set this to the host/ip the chaincode will resolve to. It could be
// the same address as the peer (such as in the sample docker env using
// the container name as the host name across the board)
ccEndpoint = viper.GetString(chaincodeAddrKey)
if ccEndpoint == "" {
// the chaincodeAddrKey is not set, try to get the address from listener
// (may finally use the peer address)
ccEndpoint = viper.GetString(chaincodeListenAddrKey)
if ccEndpoint == "" {
// Case C: chaincodeListenAddrKey is not set, use peer address
peerIp := net.ParseIP(peerHostname)
if peerIp != nil && peerIp.IsUnspecified() {
// Case D: all we have is "0.0.0.0" or "::" which chaincode cannot connect to
logger.Errorf("ChaincodeAddress and chaincodeListenAddress are nil and peerIP is %s", peerIp)
return "", errors.New("invalid endpoint for chaincode to connect")
}
// use peerAddress:defaultChaincodePort
ccEndpoint = fmt.Sprintf("%s:%d", peerHostname, defaultChaincodePort)
} else {
// Case B: chaincodeListenAddrKey is set
host, port, err := net.SplitHostPort(ccEndpoint)
if err != nil {
logger.Errorf("ChaincodeAddress is nil and fail to split chaincodeListenAddress: %s", err)
return "", err
}
ccListenerIp := net.ParseIP(host)
// ignoring other values such as Multicast address etc ...as the server
// wouldn't start up with this address anyway
if ccListenerIp != nil && ccListenerIp.IsUnspecified() {
// Case C: if "0.0.0.0" or "::", we have to use peer address with the listen port
peerIp := net.ParseIP(peerHostname)
if peerIp != nil && peerIp.IsUnspecified() {
// Case D: all we have is "0.0.0.0" or "::" which chaincode cannot connect to
logger.Error("ChaincodeAddress is nil while both chaincodeListenAddressIP and peerIP are 0.0.0.0")
return "", errors.New("invalid endpoint for chaincode to connect")
}
ccEndpoint = fmt.Sprintf("%s:%s", peerHostname, port)
}
}
} else {
// Case A: the chaincodeAddrKey is set
if host, _, err := net.SplitHostPort(ccEndpoint); err != nil {
logger.Errorf("Fail to split chaincodeAddress: %s", err)
return "", err
} else {
ccIP := net.ParseIP(host)
if ccIP != nil && ccIP.IsUnspecified() {
logger.Errorf("ChaincodeAddress' IP cannot be %s in non-dev mode", ccIP)
return "", errors.New("invalid endpoint for chaincode to connect")
}
}
}
logger.Infof("Exit with ccEndpoint: %s", ccEndpoint)
return ccEndpoint, nil
}
//NOTE - when we implement JOIN we will no longer pass the chainID as param
//The chaincode support will come up without registering system chaincodes
//which will be registered only during join phase.
func registerChaincodeSupport(grpcServer comm.GRPCServer, ccEndpoint string, ca accesscontrol.CA) {
//get user mode
userRunsCC := chaincode.IsDevMode()
//get chaincode startup timeout
ccStartupTimeout := viper.GetDuration("chaincode.startuptimeout")
if ccStartupTimeout < time.Duration(5)*time.Second {
logger.Warningf("Invalid chaincode startup timeout value %s (should be at least 5s); defaulting to 5s", ccStartupTimeout)
ccStartupTimeout = time.Duration(5) * time.Second
} else {
logger.Debugf("Chaincode startup timeout value set to %s", ccStartupTimeout)
}
ccSrv := chaincode.NewChaincodeSupport(ccEndpoint, userRunsCC, ccStartupTimeout, ca)
//Now that chaincode is initialized, register all system chaincodes.
scc.RegisterSysCCs()
pb.RegisterChaincodeSupportServer(grpcServer.Server(), ccSrv)
}
func createEventHubServer(serverConfig comm.ServerConfig) (comm.GRPCServer, error) {
var lis net.Listener
var err error
lis, err = net.Listen("tcp", viper.GetString("peer.events.address"))
if err != nil {
return nil, fmt.Errorf("failed to listen: %v", err)
}
// set the keepalive options
serverConfig.KaOpts = comm.DefaultKeepaliveOptions()
if viper.IsSet("peer.events.keepalive.minInterval") {
serverConfig.KaOpts.ServerMinInterval = viper.GetDuration("peer.events.keepalive.minInterval")
}
grpcServer, err := comm.NewGRPCServerFromListener(lis, serverConfig)
if err != nil {
logger.Errorf("Failed to return new GRPC server: %s", err)
return nil, err
}
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
ehConfig := initializeEventsServerConfig(mutualTLS)
ehServer := producer.NewEventsServer(ehConfig)
pb.RegisterEventsServer(grpcServer.Server(), ehServer)
return grpcServer, nil
}
func initializeEventsServerConfig(mutualTLS bool) *producer.EventsServerConfig {
extract := func(msg proto.Message) []byte {
evt, isEvent := msg.(*pb.Event)
if !isEvent || evt == nil {
return nil
}
return evt.TlsCertHash
}
ehConfig := &producer.EventsServerConfig{
BufferSize: uint(viper.GetInt("peer.events.buffersize")),
Timeout: viper.GetDuration("peer.events.timeout"),
TimeWindow: viper.GetDuration("peer.events.timewindow"),
BindingInspector: comm.NewBindingInspector(mutualTLS, extract)}
return ehConfig
}
func writePid(fileName string, pid int) error {
err := os.MkdirAll(filepath.Dir(fileName), 0755)
if err != nil {
return err
}
buf := strconv.Itoa(pid)
if err = ioutil.WriteFile(fileName, []byte(buf), 0644); err != nil {
logger.Errorf("Cannot write pid to %s (err:%s)", fileName, err)
return err
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/peter_code_git/fabric.git
git@gitee.com:peter_code_git/fabric.git
peter_code_git
fabric
fabric
v1.1.0-alpha

搜索帮助