代码拉取完成,页面将自动刷新
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blocksprovider
import (
"context"
"crypto/x509"
"math"
"time"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
gossipcommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
"github.com/hyperledger/fabric/protoutil"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type sleeper struct {
sleep func(time.Duration)
}
func (s sleeper) Sleep(d time.Duration, doneC chan struct{}) {
if s.sleep == nil {
timer := time.NewTimer(d)
select {
case <-timer.C:
case <-doneC:
timer.Stop()
}
return
}
s.sleep(d)
}
// LedgerInfo an adapter to provide the interface to query
// the ledger committer for current ledger height
//go:generate counterfeiter -o fake/ledger_info.go --fake-name LedgerInfo . LedgerInfo
type LedgerInfo interface {
// LedgerHeight returns current local ledger height
LedgerHeight() (uint64, error)
}
// GossipServiceAdapter serves to provide basic functionality
// required from gossip service by delivery service
//go:generate counterfeiter -o fake/gossip_service_adapter.go --fake-name GossipServiceAdapter . GossipServiceAdapter
type GossipServiceAdapter interface {
// AddPayload adds payload to the local state sync buffer
AddPayload(chainID string, payload *gossip.Payload) error
// Gossip the message across the peers
Gossip(msg *gossip.GossipMessage)
}
//go:generate counterfeiter -o fake/block_verifier.go --fake-name BlockVerifier . BlockVerifier
type BlockVerifier interface {
VerifyBlock(channelID gossipcommon.ChannelID, blockNum uint64, block *common.Block) error
}
//go:generate counterfeiter -o fake/orderer_connection_source.go --fake-name OrdererConnectionSource . OrdererConnectionSource
type OrdererConnectionSource interface {
RandomEndpoint() (*orderers.Endpoint, error)
}
//go:generate counterfeiter -o fake/dialer.go --fake-name Dialer . Dialer
type Dialer interface {
Dial(address string, certPool *x509.CertPool) (*grpc.ClientConn, error)
}
//go:generate counterfeiter -o fake/deliver_streamer.go --fake-name DeliverStreamer . DeliverStreamer
type DeliverStreamer interface {
Deliver(context.Context, *grpc.ClientConn) (orderer.AtomicBroadcast_DeliverClient, error)
}
// Deliverer the actual implementation for BlocksProvider interface
type Deliverer struct {
ChannelID string
Gossip GossipServiceAdapter
Ledger LedgerInfo
BlockVerifier BlockVerifier
Dialer Dialer
Orderers OrdererConnectionSource
DoneC chan struct{}
Signer identity.SignerSerializer
DeliverStreamer DeliverStreamer
Logger *flogging.FabricLogger
MaxRetryDelay time.Duration
InitialRetryDelay time.Duration
MaxRetryDuration time.Duration
// TLSCertHash should be nil when TLS is not enabled
TLSCertHash []byte // util.ComputeSHA256(b.credSupport.GetClientCertificate().Certificate[0])
sleeper sleeper
}
const backoffExponentBase = 1.2
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (d *Deliverer) DeliverBlocks() {
failureCounter := 0
totalDuration := time.Duration(0)
// InitialRetryDelay * backoffExponentBase^n > MaxRetryDelay
// backoffExponentBase^n > MaxRetryDelay / InitialRetryDelay
// n * log(backoffExponentBase) > log(MaxRetryDelay / InitialRetryDelay)
// n > log(MaxRetryDelay / InitialRetryDelay) / log(backoffExponentBase)
maxFailures := int(math.Log(float64(d.MaxRetryDelay)/float64(d.InitialRetryDelay)) / math.Log(backoffExponentBase))
for {
select {
case <-d.DoneC:
return
default:
}
if failureCounter > 0 {
var sleepDuration time.Duration
if failureCounter-1 > maxFailures {
sleepDuration = d.MaxRetryDelay
} else {
sleepDuration = time.Duration(math.Pow(1.2, float64(failureCounter-1))*100) * time.Millisecond
}
totalDuration += sleepDuration
if totalDuration > d.MaxRetryDuration {
d.Logger.Warningf("attempted to retry block delivery for more than %v, giving up", d.MaxRetryDuration)
return
}
d.sleeper.Sleep(sleepDuration, d.DoneC)
}
ledgerHeight, err := d.Ledger.LedgerHeight()
if err != nil {
d.Logger.Error("Did not return ledger height, something is critically wrong", err)
return
}
seekInfoEnv, err := d.createSeekInfo(ledgerHeight)
if err != nil {
d.Logger.Error("Could not create a signed Deliver SeekInfo message, something is critically wrong", err)
return
}
deliverClient, endpoint, cancel, err := d.connect(seekInfoEnv)
if err != nil {
d.Logger.Warningf("Could not connect to ordering service: %s", err)
failureCounter++
continue
}
connLogger := d.Logger.With("orderer-address", endpoint.Address)
recv := make(chan *orderer.DeliverResponse)
go func() {
for {
resp, err := deliverClient.Recv()
if err != nil {
connLogger.Warningf("Encountered an error reading from deliver stream: %s", err)
close(recv)
return
}
select {
case recv <- resp:
case <-d.DoneC:
close(recv)
return
}
}
}()
RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the connection
for {
select {
case <-endpoint.Refreshed:
connLogger.Infof("Ordering endpoints have been refreshed, disconnecting from deliver to reconnect using updated endpoints")
break RecvLoop
case response, ok := <-recv:
if !ok {
connLogger.Warningf("Orderer hung up without sending status")
failureCounter++
break RecvLoop
}
err = d.processMsg(response)
if err != nil {
connLogger.Warningf("Got error while attempting to receive blocks: %v", err)
failureCounter++
break RecvLoop
}
failureCounter = 0
case <-d.DoneC:
break RecvLoop
}
}
// cancel and wait for our spawned go routine to exit
cancel()
<-recv
}
}
func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error {
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
return errors.Errorf("received success for a seek that should never complete")
}
return errors.Errorf("received bad status %v from orderer", t.Status)
case *orderer.DeliverResponse_Block:
blockNum := t.Block.Header.Number
if err := d.BlockVerifier.VerifyBlock(gossipcommon.ChannelID(d.ChannelID), blockNum, t.Block); err != nil {
return errors.WithMessage(err, "block from orderer could not be verified")
}
marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
return errors.WithMessage(err, "block from orderer could not be re-marshaled")
}
// Create payload with a block received
payload := &gossip.Payload{
Data: marshaledBlock,
SeqNum: blockNum,
}
// Use payload to create gossip message
gossipMsg := &gossip.GossipMessage{
Nonce: 0,
Tag: gossip.GossipMessage_CHAN_AND_ORG,
Channel: []byte(d.ChannelID),
Content: &gossip.GossipMessage_DataMsg{
DataMsg: &gossip.DataMessage{
Payload: payload,
},
},
}
d.Logger.Debugf("Adding payload to local buffer, blockNum = [%d]", blockNum)
// Add payload to local state payloads buffer
if err := d.Gossip.AddPayload(d.ChannelID, payload); err != nil {
d.Logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
return errors.WithMessage(err, "could not add block as payload")
}
// Gossip messages with other nodes
d.Logger.Debugf("Gossiping block [%d]", blockNum)
d.Gossip.Gossip(gossipMsg)
return nil
default:
d.Logger.Warningf("Received unknown: %v", t)
return errors.Errorf("unknown message type '%T'", msg.Type)
}
}
// Stop stops blocks delivery provider
func (d *Deliverer) Stop() {
// this select is not race-safe, but it prevents a panic
// for careless callers multiply invoking stop
select {
case <-d.DoneC:
default:
close(d.DoneC)
}
}
func (d *Deliverer) connect(seekInfoEnv *common.Envelope) (orderer.AtomicBroadcast_DeliverClient, *orderers.Endpoint, func(), error) {
endpoint, err := d.Orderers.RandomEndpoint()
if err != nil {
return nil, nil, nil, errors.WithMessage(err, "could not get orderer endpoints")
}
conn, err := d.Dialer.Dial(endpoint.Address, endpoint.CertPool)
if err != nil {
return nil, nil, nil, errors.WithMessagef(err, "could not dial endpoint '%s'", endpoint.Address)
}
ctx, ctxCancel := context.WithCancel(context.Background())
deliverClient, err := d.DeliverStreamer.Deliver(ctx, conn)
if err != nil {
conn.Close()
ctxCancel()
return nil, nil, nil, errors.WithMessagef(err, "could not create deliver client to endpoints '%s'", endpoint.Address)
}
err = deliverClient.Send(seekInfoEnv)
if err != nil {
deliverClient.CloseSend()
conn.Close()
ctxCancel()
return nil, nil, nil, errors.WithMessagef(err, "could not send deliver seek info handshake to '%s'", endpoint.Address)
}
return deliverClient, endpoint, func() {
deliverClient.CloseSend()
ctxCancel()
conn.Close()
}, nil
}
func (d *Deliverer) createSeekInfo(ledgerHeight uint64) (*common.Envelope, error) {
return protoutil.CreateSignedEnvelopeWithTLSBinding(
common.HeaderType_DELIVER_SEEK_INFO,
d.ChannelID,
d.Signer,
&orderer.SeekInfo{
Start: &orderer.SeekPosition{
Type: &orderer.SeekPosition_Specified{
Specified: &orderer.SeekSpecified{
Number: ledgerHeight,
},
},
},
Stop: &orderer.SeekPosition{
Type: &orderer.SeekPosition_Specified{
Specified: &orderer.SeekSpecified{
Number: math.MaxUint64,
},
},
},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
},
int32(0),
uint64(0),
d.TLSCertHash,
)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。