1 Star 0 Fork 0

陈文甲/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
deliveryclient.go 7.32 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deliverclient
import (
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"google.golang.org/grpc"
)
var logger *logging.Logger // package-level logger
func init() {
logger = flogging.MustGetLogger("deliveryClient")
}
var (
reConnectTotalTimeThreshold = time.Second * 60 * 5
connTimeout = time.Second * 3
)
// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
StopDeliverForChannel(chainID string) error
// Stop terminates delivery service and closes the connection
Stop()
}
// deliverServiceImpl the implementation of the delivery service
// maintains connection to the ordering service and maps of
// blocks providers
type deliverServiceImpl struct {
conf *Config
blockProviders map[string]blocksprovider.BlocksProvider
lock sync.RWMutex
stopping bool
}
// Config dictates the DeliveryService's properties,
// namely how it connects to an ordering service endpoint,
// how it verifies messages received from it,
// and how it disseminates the messages to other peers
type Config struct {
// ConnFactory creates a connection to an endpoint
ConnFactory func(endpoint string) (*grpc.ClientConn, error)
// ABCFactory creates an AtomicBroadcastClient out of a connection
ABCFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient
// CryptoSvc performs cryptographic actions like message verification and signing
// and identity validation
CryptoSvc api.MessageCryptoService
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer
Gossip blocksprovider.GossipServiceAdapter
// Endpoints specifies the endpoints of the ordering service
Endpoints []string
}
// NewDeliverService construction function to create and initialize
// delivery service instance. It tries to establish connection to
// the specified in the configuration ordering service, in case it
// fails to dial to it, return nil
func NewDeliverService(conf *Config) (DeliverService, error) {
ds := &deliverServiceImpl{
conf: conf,
blockProviders: make(map[string]blocksprovider.BlocksProvider),
}
if err := ds.validateConfiguration(); err != nil {
return nil, err
}
return ds, nil
}
func (d *deliverServiceImpl) validateConfiguration() error {
conf := d.conf
if len(conf.Endpoints) == 0 {
return errors.New("No endpoints specified")
}
if conf.Gossip == nil {
return errors.New("No gossip provider specified")
}
if conf.ABCFactory == nil {
return errors.New("No AtomicBroadcast factory specified")
}
if conf.ConnFactory == nil {
return errors.New("No connection factory specified")
}
if conf.CryptoSvc == nil {
return errors.New("No crypto service specified")
}
return nil
}
// StartDeliverForChannel starts blocks delivery for channel
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers")
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go d.blockProviders[chainID].DeliverBlocks()
}
return nil
}
// StopDeliverForChannel stops blocks delivery for channel by stopping channel block provider
func (d *deliverServiceImpl) StopDeliverForChannel(chainID string) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping, cannot stop delivery for channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if client, exist := d.blockProviders[chainID]; exist {
client.Stop()
delete(d.blockProviders, chainID)
logger.Debug("This peer will stop pass blocks from orderer service to other peers")
} else {
errMsg := fmt.Sprintf("Delivery service - no block provider for %s found, can't stop delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
return nil
}
// Stop all service and release resources
func (d *deliverServiceImpl) Stop() {
d.lock.Lock()
defer d.lock.Unlock()
// Marking flag to indicate the shutdown of the delivery service
d.stopping = true
for _, client := range d.blockProviders {
client.Stop()
}
}
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
requester := &blocksRequester{
chainID: chainID,
}
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider)
}
backoffPolicy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
if elapsedTime.Nanoseconds() > reConnectTotalTimeThreshold.Nanoseconds() {
return 0, false
}
return time.Duration(math.Pow(2, float64(attemptNum))) * time.Millisecond * 500, true
}
connProd := comm.NewConnectionProducer(d.conf.ConnFactory, d.conf.Endpoints)
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient
return bClient
}
func DefaultConnectionFactory(endpoint string) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{grpc.WithTimeout(connTimeout), grpc.WithBlock()}
if comm.TLSEnabled() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetDeliverServiceCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
grpc.EnableTracing = true
return grpc.Dial(endpoint, dialOpts...)
}
func DefaultABCFactory(conn *grpc.ClientConn) orderer.AtomicBroadcastClient {
return orderer.NewAtomicBroadcastClient(conn)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/venjia/fabric.git
git@gitee.com:venjia/fabric.git
venjia
fabric
fabric
v1.0.0-alpha2

搜索帮助