6 Star 47 Fork 28

Hyperledger/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
deliveryclient.go 7.96 KB
一键复制 编辑 原始数据 按行查看 历史
Gari Singh 提交于 2017-07-30 08:40 . [FAB-5497] Create 1.0.1 fabric release
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package deliverclient
import (
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"google.golang.org/grpc"
)
var logger *logging.Logger // package-level logger
func init() {
logger = flogging.MustGetLogger("deliveryClient")
}
var (
reConnectTotalTimeThreshold = time.Second * 60 * 5
connTimeout = time.Second * 3
reConnectBackoffThreshold = float64(time.Hour)
)
// SetReconnectTotalTimeThreshold sets the total time the delivery service
// may spend in reconnection attempts until its retry logic gives up
// and returns an error
func SetReconnectTotalTimeThreshold(duration time.Duration) {
reConnectTotalTimeThreshold = duration
}
// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
// When the delivery finishes, the finalizer func is called
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
StopDeliverForChannel(chainID string) error
// Stop terminates delivery service and closes the connection
Stop()
}
// deliverServiceImpl the implementation of the delivery service
// maintains connection to the ordering service and maps of
// blocks providers
type deliverServiceImpl struct {
conf *Config
blockProviders map[string]blocksprovider.BlocksProvider
lock sync.RWMutex
stopping bool
}
// Config dictates the DeliveryService's properties,
// namely how it connects to an ordering service endpoint,
// how it verifies messages received from it,
// and how it disseminates the messages to other peers
type Config struct {
// ConnFactory returns a function that creates a connection to an endpoint
ConnFactory func(channelID string) func(endpoint string) (*grpc.ClientConn, error)
// ABCFactory creates an AtomicBroadcastClient out of a connection
ABCFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient
// CryptoSvc performs cryptographic actions like message verification and signing
// and identity validation
CryptoSvc api.MessageCryptoService
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer
Gossip blocksprovider.GossipServiceAdapter
// Endpoints specifies the endpoints of the ordering service
Endpoints []string
}
// NewDeliverService construction function to create and initialize
// delivery service instance. It tries to establish connection to
// the specified in the configuration ordering service, in case it
// fails to dial to it, return nil
func NewDeliverService(conf *Config) (DeliverService, error) {
ds := &deliverServiceImpl{
conf: conf,
blockProviders: make(map[string]blocksprovider.BlocksProvider),
}
if err := ds.validateConfiguration(); err != nil {
return nil, err
}
return ds, nil
}
func (d *deliverServiceImpl) validateConfiguration() error {
conf := d.conf
if len(conf.Endpoints) == 0 {
return errors.New("No endpoints specified")
}
if conf.Gossip == nil {
return errors.New("No gossip provider specified")
}
if conf.ABCFactory == nil {
return errors.New("No AtomicBroadcast factory specified")
}
if conf.ConnFactory == nil {
return errors.New("No connection factory specified")
}
if conf.CryptoSvc == nil {
return errors.New("No crypto service specified")
}
return nil
}
// StartDeliverForChannel starts blocks delivery for channel
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go func() {
d.blockProviders[chainID].DeliverBlocks()
finalizer()
}()
}
return nil
}
// StopDeliverForChannel stops blocks delivery for channel by stopping channel block provider
func (d *deliverServiceImpl) StopDeliverForChannel(chainID string) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping, cannot stop delivery for channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if client, exist := d.blockProviders[chainID]; exist {
client.Stop()
delete(d.blockProviders, chainID)
logger.Debug("This peer will stop pass blocks from orderer service to other peers")
} else {
errMsg := fmt.Sprintf("Delivery service - no block provider for %s found, can't stop delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
return nil
}
// Stop all service and release resources
func (d *deliverServiceImpl) Stop() {
d.lock.Lock()
defer d.lock.Unlock()
// Marking flag to indicate the shutdown of the delivery service
d.stopping = true
for _, client := range d.blockProviders {
client.Stop()
}
}
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
requester := &blocksRequester{
chainID: chainID,
}
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider)
}
backoffPolicy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
if elapsedTime.Nanoseconds() > reConnectTotalTimeThreshold.Nanoseconds() {
return 0, false
}
sleepIncrement := float64(time.Millisecond * 500)
attempt := float64(attemptNum)
return time.Duration(math.Min(math.Pow(2, attempt)*sleepIncrement, reConnectBackoffThreshold)), true
}
connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient
return bClient
}
func DefaultConnectionFactory(channelID string) func(endpoint string) (*grpc.ClientConn, error) {
return func(endpoint string) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{grpc.WithTimeout(connTimeout), grpc.WithBlock()}
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
// set the keepalive options
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...)
if comm.TLSEnabled() {
creds, err := comm.GetCASupport().GetDeliverServiceCredentials(channelID)
if err != nil {
return nil, fmt.Errorf("Failed obtaining credentials for channel %s: %v", channelID, err)
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
grpc.EnableTracing = true
return grpc.Dial(endpoint, dialOpts...)
}
}
func DefaultABCFactory(conn *grpc.ClientConn) orderer.AtomicBroadcastClient {
return orderer.NewAtomicBroadcastClient(conn)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.0.3

搜索帮助