13 Star 51 Fork 0

Gitee 极速下载/etcd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/coreos/etcd
克隆/下载
server.go 39.48 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484
package raft
import (
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"math"
"os"
"path"
"sort"
"sync"
"time"
)
//------------------------------------------------------------------------------
//
// Constants
//
//------------------------------------------------------------------------------
const (
Stopped = "stopped"
Initialized = "initialized"
Follower = "follower"
Candidate = "candidate"
Leader = "leader"
Snapshotting = "snapshotting"
)
const (
MaxLogEntriesPerRequest = 2000
NumberOfLogEntriesAfterSnapshot = 200
)
const (
// DefaultHeartbeatInterval is the interval that the leader will send
// AppendEntriesRequests to followers to maintain leadership.
DefaultHeartbeatInterval = 50 * time.Millisecond
DefaultElectionTimeout = 150 * time.Millisecond
)
// ElectionTimeoutThresholdPercent specifies the threshold at which the server
// will dispatch warning events that the heartbeat RTT is too close to the
// election timeout.
const ElectionTimeoutThresholdPercent = 0.8
//------------------------------------------------------------------------------
//
// Errors
//
//------------------------------------------------------------------------------
var NotLeaderError = errors.New("raft.Server: Not current leader")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var CommandTimeoutError = errors.New("raft: Command timeout")
var StopError = errors.New("raft: Has been stopped")
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server interface {
Name() string
Context() interface{}
StateMachine() StateMachine
Leader() string
State() string
Path() string
LogPath() string
SnapshotPath(lastIndex uint64, lastTerm uint64) string
Term() uint64
CommitIndex() uint64
VotedFor() string
MemberCount() int
QuorumSize() int
IsLogEmpty() bool
LogEntries() []*LogEntry
LastCommandName() string
GetState() string
ElectionTimeout() time.Duration
SetElectionTimeout(duration time.Duration)
HeartbeatInterval() time.Duration
SetHeartbeatInterval(duration time.Duration)
Transporter() Transporter
SetTransporter(t Transporter)
AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
RequestVote(req *RequestVoteRequest) *RequestVoteResponse
RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
AddPeer(name string, connectiongString string) error
RemovePeer(name string) error
Peers() map[string]*Peer
Init() error
Start() error
Stop()
Running() bool
Do(command Command) (interface{}, error)
TakeSnapshot() error
LoadSnapshot() error
AddEventListener(string, EventListener)
FlushCommitIndex()
}
type server struct {
*eventDispatcher
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.RWMutex
syncedPeer map[string]bool
stopped chan bool
c chan *ev
electionTimeout time.Duration
heartbeatInterval time.Duration
snapshot *Snapshot
// PendingSnapshot is an unfinished snapshot.
// After the pendingSnapshot is saved to disk,
// it will be set to snapshot and also will be
// set to nil.
pendingSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
connectionString string
routineGroup sync.WaitGroup
}
// An internal event to be processed by the server's event loop.
type ev struct {
target interface{}
returnValue interface{}
c chan error
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
// Creates a new server with a log at the given path. transporter must
// not be nil. stateMachine can be nil if snapshotting and log
// compaction is to be disabled. context can be anything (including nil)
// and is not used by the raft package except returned by
// Server.Context(). connectionString can be anything.
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
if transporter == nil {
panic("raft: Transporter required")
}
s := &server{
name: name,
path: path,
transporter: transporter,
stateMachine: stateMachine,
context: ctx,
state: Stopped,
peers: make(map[string]*Peer),
log: newLog(),
c: make(chan *ev, 256),
electionTimeout: DefaultElectionTimeout,
heartbeatInterval: DefaultHeartbeatInterval,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
connectionString: connectionString,
}
s.eventDispatcher = newEventDispatcher(s)
// Setup apply function.
s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
// Dispatch commit event.
s.DispatchEvent(newEvent(CommitEventType, e, nil))
// Apply command to the state machine.
switch c := c.(type) {
case CommandApply:
return c.Apply(&context{
server: s,
currentTerm: s.currentTerm,
currentIndex: s.log.internalCurrentIndex(),
commitIndex: s.log.commitIndex,
})
case deprecatedCommandApply:
return c.Apply(s)
default:
return nil, fmt.Errorf("Command does not implement Apply()")
}
}
return s, nil
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
//--------------------------------------
// General
//--------------------------------------
// Retrieves the name of the server.
func (s *server) Name() string {
return s.name
}
// Retrieves the storage path for the server.
func (s *server) Path() string {
return s.path
}
// The name of the current leader.
func (s *server) Leader() string {
return s.leader
}
// Retrieves a copy of the peer data.
func (s *server) Peers() map[string]*Peer {
s.mutex.Lock()
defer s.mutex.Unlock()
peers := make(map[string]*Peer)
for name, peer := range s.peers {
peers[name] = peer.clone()
}
return peers
}
// Retrieves the object that transports requests.
func (s *server) Transporter() Transporter {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.transporter
}
func (s *server) SetTransporter(t Transporter) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.transporter = t
}
// Retrieves the context passed into the constructor.
func (s *server) Context() interface{} {
return s.context
}
// Retrieves the state machine passed into the constructor.
func (s *server) StateMachine() StateMachine {
return s.stateMachine
}
// Retrieves the log path for the server.
func (s *server) LogPath() string {
return path.Join(s.path, "log")
}
// Retrieves the current state of the server.
func (s *server) State() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.state
}
// Sets the state of the server.
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Temporarily store previous values.
prevState := s.state
prevLeader := s.leader
// Update state and leader.
s.state = state
if state == Leader {
s.leader = s.Name()
s.syncedPeer = make(map[string]bool)
}
// Dispatch state and leader change events.
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
}
// Retrieves the current term of the server.
func (s *server) Term() uint64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.currentTerm
}
// Retrieves the current commit index of the server.
func (s *server) CommitIndex() uint64 {
s.log.mutex.RLock()
defer s.log.mutex.RUnlock()
return s.log.commitIndex
}
// Retrieves the name of the candidate this server voted for in this term.
func (s *server) VotedFor() string {
return s.votedFor
}
// Retrieves whether the server's log has no entries.
func (s *server) IsLogEmpty() bool {
return s.log.isEmpty()
}
// A list of all the log entries. This should only be used for debugging purposes.
func (s *server) LogEntries() []*LogEntry {
s.log.mutex.RLock()
defer s.log.mutex.RUnlock()
return s.log.entries
}
// A reference to the command name of the last entry.
func (s *server) LastCommandName() string {
return s.log.lastCommandName()
}
// Get the state of the server for debugging
func (s *server) GetState() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
}
// Check if the server is promotable
func (s *server) promotable() bool {
return s.log.currentIndex() > 0
}
//--------------------------------------
// Membership
//--------------------------------------
// Retrieves the number of member servers in the consensus.
func (s *server) MemberCount() int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return len(s.peers) + 1
}
// Retrieves the number of servers required to make a quorum.
func (s *server) QuorumSize() int {
return (s.MemberCount() / 2) + 1
}
//--------------------------------------
// Election timeout
//--------------------------------------
// Retrieves the election timeout.
func (s *server) ElectionTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.electionTimeout
}
// Sets the election timeout.
func (s *server) SetElectionTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.electionTimeout = duration
}
//--------------------------------------
// Heartbeat timeout
//--------------------------------------
// Retrieves the heartbeat timeout.
func (s *server) HeartbeatInterval() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.heartbeatInterval
}
// Sets the heartbeat timeout.
func (s *server) SetHeartbeatInterval(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.heartbeatInterval = duration
for _, peer := range s.peers {
peer.setHeartbeatInterval(duration)
}
}
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
//--------------------------------------
// Initialization
//--------------------------------------
// Reg the NOPCommand
func init() {
RegisterCommand(&NOPCommand{})
RegisterCommand(&DefaultJoinCommand{})
RegisterCommand(&DefaultLeaveCommand{})
}
// Start the raft server
// If log entries exist then allow promotion to candidate if no AEs received.
// If no log entries exist then wait for AEs from another node.
// If no log entries exist and a self-join command is issued then
// immediately become leader and commit entry.
func (s *server) Start() error {
// Exit if the server is already running.
if s.Running() {
return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
}
if err := s.Init(); err != nil {
return err
}
// stopped needs to be allocated each time server starts
// because it is closed at `Stop`.
s.stopped = make(chan bool)
s.setState(Follower)
// If no log entries exist then
// 1. wait for AEs from another node
// 2. wait for self-join command
// to set itself promotable
if !s.promotable() {
s.debugln("start as a new raft server")
// If log entries exist then allow promotion to candidate
// if no AEs received.
} else {
s.debugln("start from previous saved state")
}
debugln(s.GetState())
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.loop()
}()
return nil
}
// Init initializes the raft server.
// If there is no previous log file under the given path, Init() will create an empty log file.
// Otherwise, Init() will load in the log entries from the log file.
func (s *server) Init() error {
if s.Running() {
return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
}
// Server has been initialized or server was stopped after initialized
// If log has been initialized, we know that the server was stopped after
// running.
if s.state == Initialized || s.log.initialized {
s.state = Initialized
return nil
}
// Create snapshot directory if it does not exist
err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
if err != nil && !os.IsExist(err) {
s.debugln("raft: Snapshot dir error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Update the term to the last term in the log.
_, s.currentTerm = s.log.lastInfo()
s.state = Initialized
return nil
}
// Shuts down the server.
func (s *server) Stop() {
if s.State() == Stopped {
return
}
close(s.stopped)
// make sure all goroutines have stopped before we close the log
s.routineGroup.Wait()
s.log.close()
s.setState(Stopped)
}
// Checks if the server is currently running.
func (s *server) Running() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return (s.state != Stopped && s.state != Initialized)
}
//--------------------------------------
// Term
//--------------------------------------
// updates the current term for the server. This is only used when a larger
// external term is found.
func (s *server) updateCurrentTerm(term uint64, leaderName string) {
_assert(term > s.currentTerm,
"upadteCurrentTerm: update is called when term is not larger than currentTerm")
// Store previous values temporarily.
prevTerm := s.currentTerm
prevLeader := s.leader
// set currentTerm = T, convert to follower (§5.1)
// stop heartbeats before step-down
if s.state == Leader {
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
}
// update the term and clear vote for
if s.state != Follower {
s.setState(Follower)
}
s.mutex.Lock()
s.currentTerm = term
s.leader = leaderName
s.votedFor = ""
s.mutex.Unlock()
// Dispatch change events.
s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
}
//--------------------------------------
// Event Loop
//--------------------------------------
// ________
// --|Snapshot| timeout
// | -------- ______
// recover | ^ | |
// snapshot / | |snapshot | |
// higher | | v | recv majority votes
// term | -------- timeout ----------- -----------
// |-> |Follower| ----------> | Candidate |--------------------> | Leader |
// -------- ----------- -----------
// ^ higher term/ | higher term |
// | new leader | |
// |_______________________|____________________________________ |
// The main event loop for the server
func (s *server) loop() {
defer s.debugln("server.loop.end")
state := s.State()
for state != Stopped {
s.debugln("server.loop.run ", state)
switch state {
case Follower:
s.followerLoop()
case Candidate:
s.candidateLoop()
case Leader:
s.leaderLoop()
case Snapshotting:
s.snapshotLoop()
}
state = s.State()
}
}
// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *server) send(value interface{}) (interface{}, error) {
if !s.Running() {
return nil, StopError
}
event := &ev{target: value, c: make(chan error, 1)}
select {
case s.c <- event:
case <-s.stopped:
return nil, StopError
}
select {
case <-s.stopped:
return nil, StopError
case err := <-event.c:
return event.returnValue, err
}
}
func (s *server) sendAsync(value interface{}) {
if !s.Running() {
return
}
event := &ev{target: value, c: make(chan error, 1)}
// try a non-blocking send first
// in most cases, this should not be blocking
// avoid create unnecessary go routines
select {
case s.c <- event:
return
default:
}
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
select {
case s.c <- event:
case <-s.stopped:
}
}()
}
// The event loop that is run when the server is in a Follower state.
// Responds to RPCs from candidates and leaders.
// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
func (s *server) followerLoop() {
since := time.Now()
electionTimeout := s.ElectionTimeout()
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
for s.State() == Follower {
var err error
update := false
select {
case <-s.stopped:
s.setState(Stopped)
return
case e := <-s.c:
switch req := e.target.(type) {
case JoinCommand:
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(req, e)
} else {
err = NotLeaderError
}
case *AppendEntriesRequest:
// If heartbeats get too close to the election timeout then send an event.
elapsedTime := time.Now().Sub(since)
if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
}
e.returnValue, update = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, update = s.processRequestVoteRequest(req)
case *SnapshotRequest:
e.returnValue = s.processSnapshotRequest(req)
default:
err = NotLeaderError
}
// Callback to event.
e.c <- err
case <-timeoutChan:
// only allow synced follower to promote to candidate
if s.promotable() {
s.setState(Candidate)
} else {
update = true
}
}
// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
if update {
since = time.Now()
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
}
}
}
// The event loop that is run when the server is in a Candidate state.
func (s *server) candidateLoop() {
// Clear leader value.
prevLeader := s.leader
s.leader = ""
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
lastLogIndex, lastLogTerm := s.log.lastInfo()
doVote := true
votesGranted := 0
var timeoutChan <-chan time.Time
var respChan chan *RequestVoteResponse
for s.State() == Candidate {
if doVote {
// Increment current term, vote for self.
s.currentTerm++
s.votedFor = s.name
// Send RequestVote RPCs to all other servers.
respChan = make(chan *RequestVoteResponse, len(s.peers))
for _, peer := range s.peers {
s.routineGroup.Add(1)
go func(peer *Peer) {
defer s.routineGroup.Done()
peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
}(peer)
}
// Wait for either:
// * Votes received from majority of servers: become leader
// * AppendEntries RPC received from new leader: step down.
// * Election timeout elapses without election resolution: increment term, start new election
// * Discover higher term: step down (§5.1)
votesGranted = 1
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
doVote = false
}
// If we received enough votes then stop waiting for more votes.
// And return from the candidate loop
if votesGranted == s.QuorumSize() {
s.debugln("server.candidate.recv.enough.votes")
s.setState(Leader)
return
}
// Collect votes from peers.
select {
case <-s.stopped:
s.setState(Stopped)
return
case resp := <-respChan:
if success := s.processVoteResponse(resp); success {
s.debugln("server.candidate.vote.granted: ", votesGranted)
votesGranted++
}
case e := <-s.c:
var err error
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
// Callback to event.
e.c <- err
case <-timeoutChan:
doVote = true
}
}
}
// The event loop that is run when the server is in a Leader state.
func (s *server) leaderLoop() {
logIndex, _ := s.log.lastInfo()
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
for _, peer := range s.peers {
peer.setPrevLogIndex(logIndex)
peer.startHeartbeat()
}
// Commit a NOP after the server becomes leader. From the Raft paper:
// "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
// each server; repeat during idle periods to prevent election timeouts
// (§5.2)". The heartbeats started above do the "idle" period work.
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.Do(NOPCommand{})
}()
// Begin to collect response from followers
for s.State() == Leader {
var err error
select {
case <-s.stopped:
// Stop all peers before stop
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
s.setState(Stopped)
return
case e := <-s.c:
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
continue
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *AppendEntriesResponse:
s.processAppendEntriesResponse(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
// Callback to event.
e.c <- err
}
}
s.syncedPeer = nil
}
func (s *server) snapshotLoop() {
for s.State() == Snapshotting {
var err error
select {
case <-s.stopped:
s.setState(Stopped)
return
case e := <-s.c:
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
case *SnapshotRecoveryRequest:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
// Callback to event.
e.c <- err
}
}
}
//--------------------------------------
// Commands
//--------------------------------------
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
func (s *server) Do(command Command) (interface{}, error) {
return s.send(command)
}
// Processes a command.
func (s *server) processCommand(command Command, e *ev) {
s.debugln("server.command.process")
// Create an entry for the command in the log.
entry, err := s.log.createEntry(s.currentTerm, command, e)
if err != nil {
s.debugln("server.command.log.entry.error:", err)
e.c <- err
return
}
if err := s.log.appendEntry(entry); err != nil {
s.debugln("server.command.log.error:", err)
e.c <- err
return
}
s.syncedPeer[s.Name()] = true
if len(s.peers) == 0 {
commitIndex := s.log.currentIndex()
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
// Appends zero or more log entry from the leader to this server.
func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
ret, _ := s.send(req)
resp, _ := ret.(*AppendEntriesResponse)
return resp
}
// Processes the "append entries" request.
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
s.traceln("server.ae.process")
if req.Term < s.currentTerm {
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
}
if req.Term == s.currentTerm {
_assert(s.State() != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
// step-down to follower when it is a candidate
if s.state == Candidate {
// change state to follower
s.setState(Follower)
}
// discover new leader when candidate
// save leader name when follower
s.leader = req.LeaderName
} else {
// Update term and leader.
s.updateCurrentTerm(req.Term, req.LeaderName)
}
// Reject if log doesn't contain a matching previous entry.
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
s.debugln("server.ae.truncate.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
}
// Append entries to the log.
if err := s.log.appendEntries(req.Entries); err != nil {
s.debugln("server.ae.append.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
}
// Commit up to the commit index.
if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
s.debugln("server.ae.commit.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
}
// once the server appended and committed all the log entries from the leader
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
}
// Processes the "append entries" response from the peer. This is only
// processed when the server is a leader. Responses received during other
// states are dropped.
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
// If we find a higher term then change to a follower and exit.
if resp.Term() > s.Term() {
s.updateCurrentTerm(resp.Term(), "")
return
}
// panic response if it's not successful.
if !resp.Success() {
return
}
// if one peer successfully append a log from the leader term,
// we add it to the synced list
if resp.append == true {
s.syncedPeer[resp.peer] = true
}
// Increment the commit count to make sure we have a quorum before committing.
if len(s.syncedPeer) < s.QuorumSize() {
return
}
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.currentIndex())
for _, peer := range s.peers {
indices = append(indices, peer.getPrevLogIndex())
}
sort.Sort(sort.Reverse(uint64Slice(indices)))
// We can commit up to the index which the majority of the members have appended.
commitIndex := indices[s.QuorumSize()-1]
committedIndex := s.log.commitIndex
if commitIndex > committedIndex {
// leader needs to do a fsync before committing log entries
s.log.sync()
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
}
}
// processVoteReponse processes a vote request:
// 1. if the vote is granted for the current term of the candidate, return true
// 2. if the vote is denied due to smaller term, update the term of this server
// which will also cause the candidate to step-down, and return false.
// 3. if the vote is for a smaller term, ignore it and return false.
func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
if resp.VoteGranted && resp.Term == s.currentTerm {
return true
}
if resp.Term == math.MaxUint64 {
s.debugln("got a removal notification, stopping")
s.DispatchEvent(newEvent(RemovedEventType, nil, nil))
}
if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.updateCurrentTerm(resp.Term, "")
} else {
s.debugln("server.candidate.vote: denied")
}
return false
}
//--------------------------------------
// Request Vote
//--------------------------------------
// Requests a vote from a server. A vote can be obtained if the vote's term is
// at the server's current term and the server has not made a vote yet. A vote
// can also be obtained if the term is greater than the server's current term.
func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
ret, _ := s.send(req)
resp, _ := ret.(*RequestVoteResponse)
return resp
}
// Processes a "request vote" request.
func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
// Deny the vote quest if the candidate is not in the current cluster
if _, ok := s.peers[req.CandidateName]; !ok {
s.debugln("server.rv.deny.vote: unknown peer ", req.CandidateName)
return newRequestVoteResponse(math.MaxUint64, false), false
}
// If the request is coming from an old term then reject it.
if req.Term < s.Term() {
s.debugln("server.rv.deny.vote: cause stale term")
return newRequestVoteResponse(s.currentTerm, false), false
}
// If the term of the request peer is larger than this node, update the term
// If the term is equal and we've already voted for a different candidate then
// don't vote for this candidate.
if req.Term > s.Term() {
s.updateCurrentTerm(req.Term, "")
} else if s.votedFor != "" && s.votedFor != req.CandidateName {
s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
" already vote for ", s.votedFor)
return newRequestVoteResponse(s.currentTerm, false), false
}
// If the candidate's log is not at least as up-to-date as our last log then don't vote.
lastIndex, lastTerm := s.log.lastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
"Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
"Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
return newRequestVoteResponse(s.currentTerm, false), false
}
// If we made it this far then cast a vote and reset our election time out.
s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
s.votedFor = req.CandidateName
return newRequestVoteResponse(s.currentTerm, true), true
}
//--------------------------------------
// Membership
//--------------------------------------
// Adds a peer to the server.
func (s *server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
}
// Skip the Peer if it has the same name as the Server
if s.name != name {
peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
}
// Write the configuration to file.
s.writeConf()
return nil
}
// Removes a peer from the server.
func (s *server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Skip the Peer if it has the same name as the Server
if name != s.Name() {
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}
// Stop peer and remove it.
if s.State() == Leader {
// We create a go routine here to avoid potential deadlock.
// We are holding log write lock when reach this line of code.
// Peer.stopHeartbeat can be blocked without go routine, if the
// target go routine (which we want to stop) is calling
// log.getEntriesAfter and waiting for log read lock.
// So we might be holding log lock and waiting for log lock,
// which lead to a deadlock.
// TODO(xiangli) refactor log lock
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
peer.stopHeartbeat(true)
}()
}
delete(s.peers, name)
s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
}
// Write the configuration to file.
s.writeConf()
return nil
}
//--------------------------------------
// Log compaction
//--------------------------------------
func (s *server) TakeSnapshot() error {
if s.stateMachine == nil {
return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
}
// Shortcut without lock
// Exit if the server is currently creating a snapshot.
if s.pendingSnapshot != nil {
return errors.New("Snapshot: Last snapshot is not finished.")
}
// TODO: acquire the lock and no more committed is allowed
// This will be done after finishing refactoring heartbeat
s.debugln("take.snapshot")
lastIndex, lastTerm := s.log.commitInfo()
// check if there is log has been committed since the
// last snapshot.
if lastIndex == s.log.startIndex {
return nil
}
path := s.SnapshotPath(lastIndex, lastTerm)
// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
state, err := s.stateMachine.Save()
if err != nil {
return err
}
// Clone the list of peers.
peers := make([]*Peer, 0, len(s.peers)+1)
for _, peer := range s.peers {
peers = append(peers, peer.clone())
}
peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot.Peers = peers
s.pendingSnapshot.State = state
s.saveSnapshot()
// We keep some log entries after the snapshot.
// We do not want to send the whole snapshot to the slightly slow machines
if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
compactTerm := s.log.getEntry(compactIndex).Term()
s.log.compact(compactIndex, compactTerm)
}
return nil
}
// Retrieves the log path for the server.
func (s *server) saveSnapshot() error {
if s.pendingSnapshot == nil {
return errors.New("pendingSnapshot.is.nil")
}
// Write snapshot to disk.
if err := s.pendingSnapshot.save(); err != nil {
return err
}
// Swap the current and last snapshots.
tmp := s.snapshot
s.snapshot = s.pendingSnapshot
// Delete the previous snapshot if there is any change
if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
tmp.remove()
}
s.pendingSnapshot = nil
return nil
}
// Retrieves the log path for the server.
func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
}
func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
ret, _ := s.send(req)
resp, _ := ret.(*SnapshotResponse)
return resp
}
func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
// If the follower’s log contains an entry at the snapshot’s last index with a term
// that matches the snapshot’s last term, then the follower already has all the
// information found in the snapshot and can reply false.
entry := s.log.getEntry(req.LastIndex)
if entry != nil && entry.Term() == req.LastTerm {
return newSnapshotResponse(false)
}
// Update state.
s.setState(Snapshotting)
return newSnapshotResponse(true)
}
func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
ret, _ := s.send(req)
resp, _ := ret.(*SnapshotRecoveryResponse)
return resp
}
func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
// Recover state sent from request.
if err := s.stateMachine.Recovery(req.State); err != nil {
panic("cannot recover from previous state")
}
// Recover the cluster configuration.
s.peers = make(map[string]*Peer)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
// Update log state.
s.currentTerm = req.LastTerm
s.log.updateCommitIndex(req.LastIndex)
// Create local snapshot.
s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
s.saveSnapshot()
// Clear the previous log entries.
s.log.compact(req.LastIndex, req.LastTerm)
return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
}
// Load a snapshot at restart
func (s *server) LoadSnapshot() error {
// Open snapshot/ directory.
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
if err != nil {
s.debugln("cannot.open.snapshot: ", err)
return err
}
// Retrieve a list of all snapshots.
filenames, err := dir.Readdirnames(-1)
if err != nil {
dir.Close()
panic(err)
}
dir.Close()
if len(filenames) == 0 {
s.debugln("no.snapshot.to.load")
return nil
}
// Grab the latest snapshot.
sort.Strings(filenames)
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
// Read snapshot data.
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
if err != nil {
return err
}
defer file.Close()
// Check checksum.
var checksum uint32
n, err := fmt.Fscanf(file, "%08x\n", &checksum)
if err != nil {
return err
} else if n != 1 {
return errors.New("checksum.err: bad.snapshot.file")
}
// Load remaining snapshot contents.
b, err := ioutil.ReadAll(file)
if err != nil {
return err
}
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(b)
if uint32(checksum) != byteChecksum {
s.debugln(checksum, " ", byteChecksum)
return errors.New("bad snapshot file")
}
// Decode snapshot.
if err = json.Unmarshal(b, &s.snapshot); err != nil {
s.debugln("unmarshal.snapshot.error: ", err)
return err
}
// Recover snapshot into state machine.
if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
s.debugln("recovery.snapshot.error: ", err)
return err
}
// Recover cluster configuration.
for _, peer := range s.snapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
// Update log state.
s.log.startTerm = s.snapshot.LastTerm
s.log.startIndex = s.snapshot.LastIndex
s.log.updateCommitIndex(s.snapshot.LastIndex)
return err
}
//--------------------------------------
// Config File
//--------------------------------------
// Flushes commit index to the disk.
// So when the raft server restarts, it will commit upto the flushed commitIndex.
func (s *server) FlushCommitIndex() {
s.debugln("server.conf.update")
// Write the configuration to file.
s.writeConf()
}
func (s *server) writeConf() {
peers := make([]*Peer, len(s.peers))
i := 0
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := writeFileSynced(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
s.log.updateCommitIndex(conf.CommitIndex)
return nil
}
//--------------------------------------
// Debugging
//--------------------------------------
func (s *server) debugln(v ...interface{}) {
if logLevel > Debug {
debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
}
}
func (s *server) traceln(v ...interface{}) {
if logLevel > Trace {
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/etcd.git
git@gitee.com:mirrors/etcd.git
mirrors
etcd
etcd
v0.4.5

搜索帮助

0d507c66 1850385 C8b1a773 1850385