1 Star 0 Fork 0

peter / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
deliveryclient.go 9.61 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package deliverclient
import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var logger = flogging.MustGetLogger("deliveryClient")
const (
defaultReConnectTotalTimeThreshold = time.Second * 60 * 60
defaultConnectionTimeout = time.Second * 3
defaultReConnectBackoffThreshold = float64(time.Hour)
)
func getReConnectTotalTimeThreshold() time.Duration {
return util.GetDurationOrDefault("peer.deliveryclient.reconnectTotalTimeThreshold", defaultReConnectTotalTimeThreshold)
}
func getConnectionTimeout() time.Duration {
return util.GetDurationOrDefault("peer.deliveryclient.connTimeout", defaultConnectionTimeout)
}
func getReConnectBackoffThreshold() float64 {
return util.GetFloat64OrDefault("peer.deliveryclient.reConnectBackoffThreshold", defaultReConnectBackoffThreshold)
}
func staticRootsEnabled() bool {
return viper.GetBool("peer.deliveryclient.staticRootsEnabled")
}
// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
// When the delivery finishes, the finalizer func is called
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
StopDeliverForChannel(chainID string) error
// UpdateEndpoints
UpdateEndpoints(chainID string, endpoints []string) error
// Stop terminates delivery service and closes the connection
Stop()
}
// deliverServiceImpl the implementation of the delivery service
// maintains connection to the ordering service and maps of
// blocks providers
type deliverServiceImpl struct {
conf *Config
blockProviders map[string]blocksprovider.BlocksProvider
lock sync.RWMutex
stopping bool
}
// Config dictates the DeliveryService's properties,
// namely how it connects to an ordering service endpoint,
// how it verifies messages received from it,
// and how it disseminates the messages to other peers
type Config struct {
// ConnFactory returns a function that creates a connection to an endpoint
ConnFactory func(channelID string) func(endpoint string) (*grpc.ClientConn, error)
// ABCFactory creates an AtomicBroadcastClient out of a connection
ABCFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient
// CryptoSvc performs cryptographic actions like message verification and signing
// and identity validation
CryptoSvc api.MessageCryptoService
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer
Gossip blocksprovider.GossipServiceAdapter
// Endpoints specifies the endpoints of the ordering service
Endpoints []string
}
// NewDeliverService construction function to create and initialize
// delivery service instance. It tries to establish connection to
// the specified in the configuration ordering service, in case it
// fails to dial to it, return nil
func NewDeliverService(conf *Config) (DeliverService, error) {
ds := &deliverServiceImpl{
conf: conf,
blockProviders: make(map[string]blocksprovider.BlocksProvider),
}
if err := ds.validateConfiguration(); err != nil {
return nil, err
}
return ds, nil
}
func (d *deliverServiceImpl) UpdateEndpoints(chainID string, endpoints []string) error {
// Use chainID to obtain blocks provider and pass endpoints
// for update
if bp, ok := d.blockProviders[chainID]; ok {
// We have found specified channel so we can safely update it
bp.UpdateOrderingEndpoints(endpoints)
return nil
}
return errors.New(fmt.Sprintf("Channel with %s id was not found", chainID))
}
func (d *deliverServiceImpl) validateConfiguration() error {
conf := d.conf
if len(conf.Endpoints) == 0 {
return errors.New("no endpoints specified")
}
if conf.Gossip == nil {
return errors.New("no gossip provider specified")
}
if conf.ABCFactory == nil {
return errors.New("no AtomicBroadcast factory specified")
}
if conf.ConnFactory == nil {
return errors.New("no connection factory specified")
}
if conf.CryptoSvc == nil {
return errors.New("no crypto service specified")
}
return nil
}
// StartDeliverForChannel starts blocks delivery for channel
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go d.launchBlockProvider(chainID, finalizer)
}
return nil
}
func (d *deliverServiceImpl) launchBlockProvider(chainID string, finalizer func()) {
d.lock.RLock()
pb := d.blockProviders[chainID]
d.lock.RUnlock()
if pb == nil {
logger.Info("Block delivery for channel", chainID, "was stopped before block provider started")
return
}
pb.DeliverBlocks()
finalizer()
}
// StopDeliverForChannel stops blocks delivery for channel by stopping channel block provider
func (d *deliverServiceImpl) StopDeliverForChannel(chainID string) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping, cannot stop delivery for channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if client, exist := d.blockProviders[chainID]; exist {
client.Stop()
delete(d.blockProviders, chainID)
logger.Debug("This peer will stop pass blocks from orderer service to other peers")
} else {
errMsg := fmt.Sprintf("Delivery service - no block provider for %s found, can't stop delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
return nil
}
// Stop all service and release resources
func (d *deliverServiceImpl) Stop() {
d.lock.Lock()
defer d.lock.Unlock()
// Marking flag to indicate the shutdown of the delivery service
d.stopping = true
for _, client := range d.blockProviders {
client.Stop()
}
}
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
reconnectBackoffThreshold := getReConnectBackoffThreshold()
reconnectTotalTimeThreshold := getReConnectTotalTimeThreshold()
requester := &blocksRequester{
tls: viper.GetBool("peer.tls.enabled"),
chainID: chainID,
}
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider)
}
backoffPolicy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
if elapsedTime >= reconnectTotalTimeThreshold {
return 0, false
}
sleepIncrement := float64(time.Millisecond * 500)
attempt := float64(attemptNum)
return time.Duration(math.Min(math.Pow(2, attempt)*sleepIncrement, reconnectBackoffThreshold)), true
}
connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient
return bClient
}
func DefaultConnectionFactory(channelID string) func(endpoint string) (*grpc.ClientConn, error) {
return func(endpoint string) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{grpc.WithBlock()}
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize)))
// set the keepalive options
kaOpts := comm.DefaultKeepaliveOptions
if viper.IsSet("peer.keepalive.deliveryClient.interval") {
kaOpts.ClientInterval = viper.GetDuration(
"peer.keepalive.deliveryClient.interval")
}
if viper.IsSet("peer.keepalive.deliveryClient.timeout") {
kaOpts.ClientTimeout = viper.GetDuration(
"peer.keepalive.deliveryClient.timeout")
}
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(kaOpts)...)
if viper.GetBool("peer.tls.enabled") {
creds, err := comm.GetCredentialSupport().GetDeliverServiceCredentials(channelID, staticRootsEnabled())
if err != nil {
return nil, fmt.Errorf("failed obtaining credentials for channel %s: %v", channelID, err)
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
ctx, cancel := context.WithTimeout(context.Background(), getConnectionTimeout())
defer cancel()
return grpc.DialContext(ctx, endpoint, dialOpts...)
}
}
func DefaultABCFactory(conn *grpc.ClientConn) orderer.AtomicBroadcastClient {
return orderer.NewAtomicBroadcastClient(conn)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/peter_code_git/fabric.git
git@gitee.com:peter_code_git/fabric.git
peter_code_git
fabric
fabric
v1.4.1-rc1

搜索帮助

344bd9b3 5694891 D2dac590 5694891