63 Star 182 Fork 3

Gitee 极速下载/hyperledger-fabric

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
Clone or Download
chain.go 39.29 KB
Copy Edit Raw Blame History
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"context"
"encoding/pem"
"fmt"
"sync"
"sync/atomic"
"time"
"code.cloudfoundry.org/clock"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/orderer/etcdraft"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
)
const (
BYTE = 1 << (10 * iota)
KILOBYTE
MEGABYTE
GIGABYTE
TERABYTE
)
const (
// DefaultSnapshotCatchUpEntries is the default number of entries
// to preserve in memory when a snapshot is taken. This is for
// slow followers to catch up.
DefaultSnapshotCatchUpEntries = uint64(20)
// DefaultSnapshotIntervalSize is the default snapshot interval. It is
// used if SnapshotIntervalSize is not provided in channel config options.
// It is needed to enforce snapshot being set.
DefaultSnapshotIntervalSize = 20 * MEGABYTE // 20 MB
// DefaultEvictionSuspicion is the threshold that a node will start
// suspecting its own eviction if it has been leaderless for this
// period of time.
DefaultEvictionSuspicion = time.Minute * 10
// DefaultLeaderlessCheckInterval is the interval that a chain checks
// its own leadership status.
DefaultLeaderlessCheckInterval = time.Second * 10
)
//go:generate counterfeiter -o mocks/configurator.go . Configurator
// Configurator is used to configure the communication layer
// when the chain starts.
type Configurator interface {
Configure(channel string, newNodes []cluster.RemoteNode)
}
//go:generate counterfeiter -o mocks/mock_rpc.go . RPC
// RPC is used to mock the transport layer in tests.
type RPC interface {
SendConsensus(dest uint64, msg *orderer.ConsensusRequest) error
SendSubmit(dest uint64, request *orderer.SubmitRequest) error
}
//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller
// BlockPuller is used to pull blocks from other OSN
type BlockPuller interface {
PullBlock(seq uint64) *common.Block
HeightsByEndpoints() (map[string]uint64, error)
Close()
}
// CreateBlockPuller is a function to create BlockPuller on demand.
// It is passed into chain initializer so that tests could mock this.
type CreateBlockPuller func() (BlockPuller, error)
// Options contains all the configurations relevant to the chain.
type Options struct {
RaftID uint64
Clock clock.Clock
WALDir string
SnapDir string
SnapshotIntervalSize uint32
// This is configurable mainly for testing purpose. Users are not
// expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used.
SnapshotCatchUpEntries uint64
MemoryStorage MemoryStorage
Logger *flogging.FabricLogger
TickInterval time.Duration
ElectionTick int
HeartbeatTick int
MaxSizePerMsg uint64
MaxInflightBlocks int
// BlockMetdata and Consenters should only be modified while under lock
// of raftMetadataLock
BlockMetadata *etcdraft.BlockMetadata
Consenters map[uint64]*etcdraft.Consenter
// MigrationInit is set when the node starts right after consensus-type migration
MigrationInit bool
Metrics *Metrics
Cert []byte
EvictionSuspicion time.Duration
LeaderCheckInterval time.Duration
}
type submit struct {
req *orderer.SubmitRequest
leader chan uint64
}
type gc struct {
index uint64
state raftpb.ConfState
data []byte
}
// Chain implements consensus.Chain interface.
type Chain struct {
configurator Configurator
rpc RPC
raftID uint64
channelID string
lastKnownLeader uint64
submitC chan *submit
applyC chan apply
observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
gcC chan *gc // Signal to take snapshot
errorCLock sync.RWMutex
errorC chan struct{} // returned by Errored()
raftMetadataLock sync.RWMutex
confChangeInProgress *raftpb.ConfChange
justElected bool // this is true when node has just been elected
configInflight bool // this is true when there is config block or ConfChange in flight
blockInflight int // number of in flight blocks
clock clock.Clock // Tests can inject a fake clock
support consensus.ConsenterSupport
lastBlock *common.Block
appliedIndex uint64
// needed by snapshotting
sizeLimit uint32 // SnapshotIntervalSize in bytes
accDataSize uint32 // accumulative data size since last snapshot
lastSnapBlockNum uint64
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
createPuller CreateBlockPuller // func used to create BlockPuller on demand
fresh bool // indicate if this is a fresh raft node
// this is exported so that test can use `Node.Status()` to get raft node status.
Node *node
opts Options
Metrics *Metrics
logger *flogging.FabricLogger
periodicChecker *PeriodicCheck
}
// NewChain constructs a chain object.
func NewChain(
support consensus.ConsenterSupport,
opts Options,
conf Configurator,
rpc RPC,
f CreateBlockPuller,
observeC chan<- raft.SoftState) (*Chain, error) {
lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
fresh := !wal.Exist(opts.WALDir)
storage, err := CreateStorage(lg, opts.WALDir, opts.SnapDir, opts.MemoryStorage)
if err != nil {
return nil, errors.Errorf("failed to restore persisted raft data: %s", err)
}
if opts.SnapshotCatchUpEntries == 0 {
storage.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
} else {
storage.SnapshotCatchUpEntries = opts.SnapshotCatchUpEntries
}
sizeLimit := opts.SnapshotIntervalSize
if sizeLimit == 0 {
sizeLimit = DefaultSnapshotIntervalSize
}
// get block number in last snapshot, if exists
var snapBlkNum uint64
var cc raftpb.ConfState
if s := storage.Snapshot(); !raft.IsEmptySnap(s) {
b := utils.UnmarshalBlockOrPanic(s.Data)
snapBlkNum = b.Header.Number
cc = s.Metadata.ConfState
}
b := support.Block(support.Height() - 1)
if b == nil {
return nil, errors.Errorf("failed to get last block")
}
c := &Chain{
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *submit),
applyC: make(chan apply),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
startC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
errorC: make(chan struct{}),
gcC: make(chan *gc),
observeC: observeC,
support: support,
fresh: fresh,
appliedIndex: opts.BlockMetadata.RaftIndex,
lastBlock: b,
sizeLimit: sizeLimit,
lastSnapBlockNum: snapBlkNum,
confState: cc,
createPuller: f,
clock: opts.Clock,
Metrics: &Metrics{
ClusterSize: opts.Metrics.ClusterSize.With("channel", support.ChainID()),
IsLeader: opts.Metrics.IsLeader.With("channel", support.ChainID()),
CommittedBlockNumber: opts.Metrics.CommittedBlockNumber.With("channel", support.ChainID()),
SnapshotBlockNumber: opts.Metrics.SnapshotBlockNumber.With("channel", support.ChainID()),
LeaderChanges: opts.Metrics.LeaderChanges.With("channel", support.ChainID()),
ProposalFailures: opts.Metrics.ProposalFailures.With("channel", support.ChainID()),
DataPersistDuration: opts.Metrics.DataPersistDuration.With("channel", support.ChainID()),
NormalProposalsReceived: opts.Metrics.NormalProposalsReceived.With("channel", support.ChainID()),
ConfigProposalsReceived: opts.Metrics.ConfigProposalsReceived.With("channel", support.ChainID()),
},
logger: lg,
opts: opts,
}
// Sets initial values for metrics
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
c.Metrics.IsLeader.Set(float64(0)) // all nodes start out as followers
c.Metrics.CommittedBlockNumber.Set(float64(c.lastBlock.Header.Number))
c.Metrics.SnapshotBlockNumber.Set(float64(c.lastSnapBlockNum))
// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
// We guard against replay of written blocks with `appliedIndex` instead.
config := &raft.Config{
ID: c.raftID,
ElectionTick: c.opts.ElectionTick,
HeartbeatTick: c.opts.HeartbeatTick,
MaxSizePerMsg: c.opts.MaxSizePerMsg,
MaxInflightMsgs: c.opts.MaxInflightBlocks,
Logger: c.logger,
Storage: c.opts.MemoryStorage,
// PreVote prevents reconnected node from disturbing network.
// See etcd/raft doc for more details.
PreVote: true,
CheckQuorum: true,
DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
}
c.Node = &node{
chainID: c.channelID,
chain: c,
logger: c.logger,
metrics: c.Metrics,
storage: storage,
rpc: c.rpc,
config: config,
tickInterval: c.opts.TickInterval,
clock: c.clock,
metadata: c.opts.BlockMetadata,
}
return c, nil
}
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
if err := c.configureComm(); err != nil {
c.logger.Errorf("Failed to start chain, aborting: +%v", err)
close(c.doneC)
return
}
isJoin := c.support.Height() > 1
if isJoin && c.opts.MigrationInit {
isJoin = false
c.logger.Infof("Consensus-type migration detected, starting new raft node on an existing channel; height=%d", c.support.Height())
}
c.Node.start(c.fresh, isJoin)
close(c.startC)
close(c.errorC)
go c.gc()
go c.serveRequest()
es := c.newEvictionSuspector()
interval := DefaultLeaderlessCheckInterval
if c.opts.LeaderCheckInterval != 0 {
interval = c.opts.LeaderCheckInterval
}
c.periodicChecker = &PeriodicCheck{
Logger: c.logger,
Report: es.confirmSuspicion,
CheckInterval: interval,
Condition: c.suspectEviction,
}
c.periodicChecker.Run()
}
// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.NormalProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Configure submits config type transactions for ordering.
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
c.Metrics.ConfigProposalsReceived.Add(1)
if err := c.checkConfigUpdateValidity(env); err != nil {
c.logger.Warnf("Rejected config: %s", err)
c.Metrics.ProposalFailures.Add(1)
return err
}
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Validate the config update for being of Type A or Type B as described in the design doc.
func (c *Chain) checkConfigUpdateValidity(ctx *common.Envelope) error {
var err error
payload, err := utils.UnmarshalPayload(ctx.Payload)
if err != nil {
return err
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}
if chdr.Type != int32(common.HeaderType_ORDERER_TRANSACTION) &&
chdr.Type != int32(common.HeaderType_CONFIG) {
return errors.Errorf("config transaction has unknown header type: %s", common.HeaderType(chdr.Type))
}
if chdr.Type == int32(common.HeaderType_ORDERER_TRANSACTION) {
newChannelConfig, err := utils.UnmarshalEnvelope(payload.Data)
if err != nil {
return err
}
payload, err = utils.UnmarshalPayload(newChannelConfig.Payload)
if err != nil {
return err
}
}
configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
if err != nil {
return err
}
metadata, err := MetadataFromConfigUpdate(configUpdate)
if err != nil {
return err
}
if metadata == nil {
return nil // ConsensusType is not updated
}
if err = CheckConfigMetadata(metadata); err != nil {
return err
}
switch chdr.Type {
case int32(common.HeaderType_ORDERER_TRANSACTION):
c.raftMetadataLock.RLock()
set := MembershipByCert(c.opts.Consenters)
c.raftMetadataLock.RUnlock()
for _, c := range metadata.Consenters {
if _, exits := set[string(c.ClientTlsCert)]; !exits {
return errors.Errorf("new channel has consenter that is not part of system consenter set")
}
}
return nil
case int32(common.HeaderType_CONFIG):
c.raftMetadataLock.RLock()
_, err = ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, metadata.Consenters)
c.raftMetadataLock.RUnlock()
return err
default:
// panic here because we have just check header type and return early
c.logger.Panicf("Programming error, unknown header type")
}
return nil
}
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
func (c *Chain) WaitReady() error {
if err := c.isRunning(); err != nil {
return err
}
select {
case c.submitC <- nil:
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
return nil
}
// Errored returns a channel that closes when the chain stops.
func (c *Chain) Errored() <-chan struct{} {
c.errorCLock.RLock()
defer c.errorCLock.RUnlock()
return c.errorC
}
// Halt stops the chain.
func (c *Chain) Halt() {
select {
case <-c.startC:
default:
c.logger.Warnf("Attempted to halt a chain that has not started")
return
}
select {
case c.haltC <- struct{}{}:
case <-c.doneC:
return
}
<-c.doneC
}
func (c *Chain) isRunning() error {
select {
case <-c.startC:
default:
return errors.Errorf("chain is not started")
}
select {
case <-c.doneC:
return errors.Errorf("chain is stopped")
default:
}
return nil
}
// Consensus passes the given ConsensusRequest message to the raft.Node instance
func (c *Chain) Consensus(req *orderer.ConsensusRequest, sender uint64) error {
if err := c.isRunning(); err != nil {
return err
}
stepMsg := &raftpb.Message{}
if err := proto.Unmarshal(req.Payload, stepMsg); err != nil {
return fmt.Errorf("failed to unmarshal StepRequest payload to Raft Message: %s", err)
}
if err := c.Node.Step(context.TODO(), *stepMsg); err != nil {
return fmt.Errorf("failed to process Raft Step message: %s", err)
}
return nil
}
// Submit forwards the incoming request to:
// - the local serveRequest goroutine if this is leader
// - the actual leader via the transport mechanism
// The call fails if there's no leader elected yet.
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
if err := c.isRunning(); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}:
lead := <-leadC
if lead == raft.None {
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("no Raft leader")
}
if lead != c.raftID {
if err := c.rpc.SendSubmit(lead, req); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
}
case <-c.doneC:
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("chain is stopped")
}
return nil
}
type apply struct {
entries []raftpb.Entry
soft *raft.SoftState
}
func isCandidate(state raft.StateType) bool {
return state == raft.StatePreCandidate || state == raft.StateCandidate
}
func (c *Chain) serveRequest() {
ticking := false
timer := c.clock.NewTimer(time.Second)
// we need a stopped timer rather than nil,
// because we will be select waiting on timer.C()
if !timer.Stop() {
<-timer.C()
}
// if timer is already started, this is a no-op
startTimer := func() {
if !ticking {
ticking = true
timer.Reset(c.support.SharedConfig().BatchTimeout())
}
}
stopTimer := func() {
if !timer.Stop() && ticking {
// we only need to drain the channel if the timer expired (not explicitly stopped)
<-timer.C()
}
ticking = false
}
var soft raft.SoftState
submitC := c.submitC
var bc *blockCreator
var propC chan<- *common.Block
var cancelProp context.CancelFunc
cancelProp = func() {} // no-op as initial value
becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
c.Metrics.IsLeader.Set(1)
c.blockInflight = 0
c.justElected = true
submitC = nil
ch := make(chan *common.Block, c.opts.MaxInflightBlocks)
// if there is unfinished ConfChange, we should resume the effort to propose it as
// new leader, and wait for it to be committed before start serving new requests.
if cc := c.getInFlightConfChange(); cc != nil {
// The reason `ProposeConfChange` should be called in go routine is documented in `writeConfigBlock` method.
go func() {
if err := c.Node.ProposeConfChange(context.TODO(), *cc); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = cc
c.configInflight = true
}
// Leader should call Propose in go routine, because this method may be blocked
// if node is leaderless (this can happen when leader steps down in a heavily
// loaded network). We need to make sure applyC can still be consumed properly.
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context, ch <-chan *common.Block) {
for {
select {
case b := <-ch:
data := utils.MarshalOrPanic(b)
if err := c.Node.Propose(ctx, data); err != nil {
c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
return
}
c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)
case <-ctx.Done():
c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
return
}
}
}(ctx, ch)
return ch, cancel
}
becomeFollower := func() {
cancelProp()
c.blockInflight = 0
_ = c.support.BlockCutter().Cut()
stopTimer()
submitC = c.submitC
bc = nil
c.Metrics.IsLeader.Set(0)
}
for {
select {
case s := <-submitC:
if s == nil {
// polled by `WaitReady`
continue
}
if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
s.leader <- raft.None
continue
}
s.leader <- soft.Lead
if soft.Lead != c.raftID {
continue
}
batches, pending, err := c.ordered(s.req)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
continue
}
if pending {
startTimer() // no-op if timer is already started
} else {
stopTimer()
}
c.propose(propC, bc, batches...)
if c.configInflight {
c.logger.Info("Received config transaction, pause accepting transaction till it is committed")
submitC = nil
} else if c.blockInflight >= c.opts.MaxInflightBlocks {
c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
c.blockInflight, c.opts.MaxInflightBlocks)
submitC = nil
}
case app := <-c.applyC:
if app.soft != nil {
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
if newLeader != soft.Lead {
c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
c.Metrics.LeaderChanges.Add(1)
atomic.StoreUint64(&c.lastKnownLeader, newLeader)
if newLeader == c.raftID {
propC, cancelProp = becomeLeader()
}
if soft.Lead == c.raftID {
becomeFollower()
}
}
foundLeader := soft.Lead == raft.None && newLeader != raft.None
quitCandidate := isCandidate(soft.RaftState) && !isCandidate(app.soft.RaftState)
if foundLeader || quitCandidate {
c.errorCLock.Lock()
c.errorC = make(chan struct{})
c.errorCLock.Unlock()
}
if isCandidate(app.soft.RaftState) || newLeader == raft.None {
atomic.StoreUint64(&c.lastKnownLeader, raft.None)
select {
case <-c.errorC:
default:
nodeCount := len(c.opts.BlockMetadata.ConsenterIds)
// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
if nodeCount > 2 {
close(c.errorC)
} else {
c.logger.Warningf("No leader is present, cluster size is %d", nodeCount)
}
}
}
soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}
// notify external observer
select {
case c.observeC <- soft:
default:
}
}
c.apply(app.entries)
if c.justElected {
msgInflight := c.Node.lastIndex() > c.appliedIndex
if msgInflight {
c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
continue
}
if c.configInflight {
c.logger.Debugf("There is config block in flight, new leader should not serve requests")
continue
}
c.logger.Infof("Start accepting requests as Raft leader at block [%d]", c.lastBlock.Header.Number)
bc = &blockCreator{
hash: c.lastBlock.Header.Hash(),
number: c.lastBlock.Header.Number,
logger: c.logger,
}
submitC = c.submitC
c.justElected = false
} else if c.configInflight {
c.logger.Info("Config block or ConfChange in flight, pause accepting transaction")
submitC = nil
} else if c.blockInflight < c.opts.MaxInflightBlocks {
submitC = c.submitC
}
case <-timer.C():
ticking = false
batch := c.support.BlockCutter().Cut()
if len(batch) == 0 {
c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
c.logger.Debugf("Batch timer expired, creating block")
c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
case sn := <-c.snapC:
if sn.Metadata.Index != 0 {
if sn.Metadata.Index <= c.appliedIndex {
c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
break
}
c.confState = sn.Metadata.ConfState
c.appliedIndex = sn.Metadata.Index
} else {
c.logger.Infof("Received artificial snapshot to trigger catchup")
}
if err := c.catchUp(sn); err != nil {
c.logger.Panicf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
sn.Metadata.Term, sn.Metadata.Index, err)
}
case <-c.doneC:
cancelProp()
select {
case <-c.errorC: // avoid closing closed channel
default:
close(c.errorC)
}
c.logger.Infof("Stop serving requests")
c.periodicChecker.Stop()
return
}
}
}
func (c *Chain) writeBlock(block *common.Block, index uint64) {
if block.Header.Number > c.lastBlock.Header.Number+1 {
c.logger.Panicf("Got block [%d], expect block [%d]", block.Header.Number, c.lastBlock.Header.Number+1)
} else if block.Header.Number < c.lastBlock.Header.Number+1 {
c.logger.Infof("Got block [%d], expect block [%d], this node was forced to catch up", block.Header.Number, c.lastBlock.Header.Number+1)
return
}
if c.blockInflight > 0 {
c.blockInflight-- // only reduce on leader
}
c.lastBlock = block
c.logger.Infof("Writing block [%d] (Raft index: %d) to ledger", block.Header.Number, index)
if utils.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
return
}
c.raftMetadataLock.Lock()
c.opts.BlockMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.BlockMetadata)
c.raftMetadataLock.Unlock()
c.support.WriteBlock(block, m)
}
// Orders the envelope in the `msg` content. SubmitRequest.
// Returns
// -- batches [][]*common.Envelope; the batches cut,
// -- pending bool; if there are envelopes pending to be ordered,
// -- err error; the error encountered, if any.
// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.
func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelope, pending bool, err error) {
seq := c.support.Sequence()
if c.isConfig(msg.Payload) {
// ConfigMsg
if msg.LastValidationSeq < seq {
c.logger.Warnf("Config message was validated against %d, although current config seq has advanced (%d)", msg.LastValidationSeq, seq)
msg.Payload, _, err = c.support.ProcessConfigMsg(msg.Payload)
if err != nil {
c.Metrics.ProposalFailures.Add(1)
return nil, true, errors.Errorf("bad config message: %s", err)
}
if err = c.checkConfigUpdateValidity(msg.Payload); err != nil {
c.Metrics.ProposalFailures.Add(1)
return nil, true, errors.Errorf("bad config message: %s", err)
}
}
batch := c.support.BlockCutter().Cut()
batches = [][]*common.Envelope{}
if len(batch) != 0 {
batches = append(batches, batch)
}
batches = append(batches, []*common.Envelope{msg.Payload})
return batches, false, nil
}
// it is a normal message
if msg.LastValidationSeq < seq {
c.logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", msg.LastValidationSeq, seq)
if _, err := c.support.ProcessNormalMsg(msg.Payload); err != nil {
c.Metrics.ProposalFailures.Add(1)
return nil, true, errors.Errorf("bad normal message: %s", err)
}
}
batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
return batches, pending, nil
}
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch)
c.logger.Infof("Created block [%d], there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b:
default:
c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
}
// if it is config block, then we should wait for the commit of the block
if utils.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
return
}
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
b, err := utils.UnmarshalBlock(snap.Data)
if err != nil {
return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
}
if c.lastBlock.Header.Number >= b.Header.Number {
c.logger.Warnf("Snapshot is at block [%d], local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
return nil
}
puller, err := c.createPuller()
if err != nil {
return errors.Errorf("failed to create block puller: %s", err)
}
defer puller.Close()
next := c.lastBlock.Header.Number + 1
c.logger.Infof("Catching up with snapshot taken at block [%d], starting from block [%d]", b.Header.Number, next)
for next <= b.Header.Number {
block := puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block [%d] from cluster", next)
}
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
configMembership := c.detectConfChange(block)
if configMembership != nil && configMembership.Changed() {
c.logger.Infof("Config block [%d] changes consenter set, communication should be reconfigured", block.Header.Number)
c.raftMetadataLock.Lock()
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
c.raftMetadataLock.Unlock()
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
} else {
c.support.WriteBlock(block, nil)
}
c.lastBlock = block
next++
}
c.logger.Infof("Finished syncing with cluster up to and including block [%d]", b.Header.Number)
return nil
}
func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
// If config is targeting THIS channel, inspect consenter set and
// propose raft ConfChange if it adds/removes node.
configMetadata := c.newConfigMetadata(block)
if configMetadata == nil {
return nil
}
if configMetadata.Options != nil &&
configMetadata.Options.SnapshotIntervalSize != 0 &&
configMetadata.Options.SnapshotIntervalSize != c.sizeLimit {
c.logger.Infof("Update snapshot interval size to %d bytes (was %d)",
configMetadata.Options.SnapshotIntervalSize, c.sizeLimit)
c.sizeLimit = configMetadata.Options.SnapshotIntervalSize
}
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMetadata.Consenters)
if err != nil {
c.logger.Panicf("illegal configuration change detected: %s", err)
}
if changes.Rotated() {
c.logger.Infof("Config block [%d] rotates TLS certificate of node %d", block.Header.Number, changes.RotatedNode)
}
return changes
}
func (c *Chain) apply(ents []raftpb.Entry) {
if len(ents) == 0 {
return
}
if ents[0].Index > c.appliedIndex+1 {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}
var position int
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
break
}
position = i
c.accDataSize += uint32(len(ents[i].Data))
// We need to strictly avoid re-applying normal entries,
// otherwise we are writing the same block twice.
if ents[i].Index <= c.appliedIndex {
c.logger.Debugf("Received block with raft index (%d) <= applied index (%d), skip", ents[i].Index, c.appliedIndex)
break
}
block := utils.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index)
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
continue
}
c.confState = *c.Node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
// This ConfChange was introduced by a previously committed config block,
// we can now unblock submitC to accept envelopes.
if c.confChangeInProgress != nil &&
c.confChangeInProgress.NodeID == cc.NodeID &&
c.confChangeInProgress.Type == cc.Type {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
// calling goroutine, since otherwise it will be blocked
// trying to write into haltC
lead := atomic.LoadUint64(&c.lastKnownLeader)
if lead == c.raftID {
c.logger.Info("This node is being removed as current leader, halt with delay")
c.configInflight = true // toggle the flag so this node does not accept further tx
go func() {
select {
case <-c.clock.After(time.Duration(c.opts.ElectionTick) * c.opts.TickInterval):
case <-c.doneC:
}
c.Halt()
}()
} else {
go c.Halt()
}
}
}
if ents[i].Index > c.appliedIndex {
c.appliedIndex = ents[i].Index
}
}
if c.accDataSize >= c.sizeLimit {
b := utils.UnmarshalBlockOrPanic(ents[position].Data)
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block [%d] (index: %d), last snapshotted block number is %d, current nodes: %+v",
c.accDataSize, c.sizeLimit, b.Header.Number, c.appliedIndex, c.lastSnapBlockNum, c.confState.Nodes)
c.accDataSize = 0
c.lastSnapBlockNum = b.Header.Number
c.Metrics.SnapshotBlockNumber.Set(float64(b.Header.Number))
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotIntervalSize is too small")
}
}
return
}
func (c *Chain) gc() {
for {
select {
case g := <-c.gcC:
c.Node.takeSnapshot(g.index, g.state, g.data)
case <-c.doneC:
c.logger.Infof("Stop garbage collecting")
return
}
}
}
func (c *Chain) isConfig(env *common.Envelope) bool {
h, err := utils.ChannelHeader(env)
if err != nil {
c.logger.Panicf("failed to extract channel header from envelope")
}
return h.Type == int32(common.HeaderType_CONFIG) || h.Type == int32(common.HeaderType_ORDERER_TRANSACTION)
}
func (c *Chain) configureComm() error {
// Reset unreachable map when communication is reconfigured
c.Node.unreachableLock.Lock()
c.Node.unreachable = make(map[uint64]struct{})
c.Node.unreachableLock.Unlock()
nodes, err := c.remotePeers()
if err != nil {
return err
}
c.configurator.Configure(c.channelID, nodes)
return nil
}
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
c.raftMetadataLock.RLock()
defer c.raftMetadataLock.RUnlock()
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.Consenters {
// No need to know yourself
if raftID == c.raftID {
continue
}
serverCertAsDER, err := pemToDER(consenter.ServerTlsCert, raftID, "server", c.logger)
if err != nil {
return nil, errors.WithStack(err)
}
clientCertAsDER, err := pemToDER(consenter.ClientTlsCert, raftID, "client", c.logger)
if err != nil {
return nil, errors.WithStack(err)
}
nodes = append(nodes, cluster.RemoteNode{
ID: raftID,
Endpoint: fmt.Sprintf("%s:%d", consenter.Host, consenter.Port),
ServerTLSCert: serverCertAsDER,
ClientTLSCert: clientCertAsDER,
})
}
return nodes, nil
}
func pemToDER(pemBytes []byte, id uint64, certType string, logger *flogging.FabricLogger) ([]byte, error) {
bl, _ := pem.Decode(pemBytes)
if bl == nil {
logger.Errorf("Rejecting PEM block of %s TLS cert for node %d, offending PEM is: %s", certType, id, string(pemBytes))
return nil, errors.Errorf("invalid PEM block")
}
return bl.Bytes, nil
}
// writeConfigBlock writes configuration blocks into the ledger in
// addition extracts updates about raft replica set and if there
// are changes updates cluster membership as well
func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
hdr, err := ConfigChannelHeader(block)
if err != nil {
c.logger.Panicf("Failed to get config header type from config block: %s", err)
}
c.configInflight = false
switch common.HeaderType(hdr.Type) {
case common.HeaderType_CONFIG:
configMembership := c.detectConfChange(block)
c.raftMetadataLock.Lock()
c.opts.BlockMetadata.RaftIndex = index
if configMembership != nil {
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
}
c.raftMetadataLock.Unlock()
blockMetadataBytes := utils.MarshalOrPanic(c.opts.BlockMetadata)
// write block with metadata
c.support.WriteConfigBlock(block, blockMetadataBytes)
if configMembership == nil {
return
}
// update membership
if configMembership.ConfChange != nil {
// We need to propose conf change in a go routine, because it may be blocked if raft node
// becomes leaderless, and we should not block `serveRequest` so it can keep consuming applyC,
// otherwise we have a deadlock.
go func() {
// ProposeConfChange returns error only if node being stopped.
// This proposal is dropped by followers because DisableProposalForwarding is enabled.
if err := c.Node.ProposeConfChange(context.TODO(), *configMembership.ConfChange); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = configMembership.ConfChange
switch configMembership.ConfChange.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", configMembership.ConfChange.NodeID)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", configMembership.ConfChange.NodeID)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
c.configInflight = true
} else if configMembership.Rotated() {
lead := atomic.LoadUint64(&c.lastKnownLeader)
if configMembership.RotatedNode == lead {
c.logger.Infof("Certificate of Raft leader is being rotated, attempt leader transfer before reconfiguring communication")
go func() {
c.Node.abdicateLeader(lead)
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}()
} else {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
}
case common.HeaderType_ORDERER_TRANSACTION:
// If this config is channel creation, no extra inspection is needed
c.raftMetadataLock.Lock()
c.opts.BlockMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.BlockMetadata)
c.raftMetadataLock.Unlock()
c.support.WriteConfigBlock(block, m)
default:
c.logger.Panicf("Programming error: unexpected config type: %s", common.HeaderType(hdr.Type))
}
}
// getInFlightConfChange returns ConfChange in-flight if any.
// It returns confChangeInProgress if it is not nil. Otherwise
// it returns ConfChange from the last committed block (might be nil).
func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
if c.confChangeInProgress != nil {
return c.confChangeInProgress
}
if c.lastBlock.Header.Number == 0 {
return nil // nothing to failover just started the chain
}
if !utils.IsConfigBlock(c.lastBlock) {
return nil
}
// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})
if len(confState.Nodes) == len(c.opts.BlockMetadata.ConsenterIds) {
// Raft configuration change could only add one node or
// remove one node at a time, if raft conf state size is
// equal to membership stored in block metadata field,
// that means everything is in sync and no need to propose
// config update.
return nil
}
return ConfChange(c.opts.BlockMetadata, confState)
}
// newMetadata extract config metadata from the configuration block
func (c *Chain) newConfigMetadata(block *common.Block) *etcdraft.ConfigMetadata {
metadata, err := ConsensusMetadataFromConfigBlock(block)
if err != nil {
c.logger.Panicf("error reading consensus metadata: %s", err)
}
return metadata
}
func (c *Chain) suspectEviction() bool {
if c.isRunning() != nil {
return false
}
return atomic.LoadUint64(&c.lastKnownLeader) == uint64(0)
}
func (c *Chain) newEvictionSuspector() *evictionSuspector {
return &evictionSuspector{
amIInChannel: ConsenterCertificate(c.opts.Cert).IsConsenterOfChannel,
evictionSuspicionThreshold: c.opts.EvictionSuspicion,
writeBlock: c.support.Append,
createPuller: c.createPuller,
height: c.support.Height,
triggerCatchUp: c.triggerCatchup,
logger: c.logger,
halt: func() {
c.Halt()
},
}
}
func (c *Chain) triggerCatchup(sn *raftpb.Snapshot) {
select {
case c.snapC <- sn:
case <-c.doneC:
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/hyperledger-fabric.git
git@gitee.com:mirrors/hyperledger-fabric.git
mirrors
hyperledger-fabric
hyperledger-fabric
v1.4.3

Search