4 Star 0 Fork 1

wanttobeamaster / gridbase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
peer_storage.go 12.70 KB
一键复制 编辑 原始数据 按行查看 历史
wanttobeamaster 提交于 2021-04-27 15:58 . change
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
// Copyright 2016 DeepFabric, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package raftstore
import (
"bytes"
"fmt"
"sync"
"sync/atomic"
"gitee.com/wanttobeamaster/etcd/raft"
"gitee.com/wanttobeamaster/etcd/raft/raftpb"
"gitee.com/wanttobeamaster/gridbase/pkg/pb/metapb"
"gitee.com/wanttobeamaster/gridbase/pkg/pb/mraft"
"gitee.com/wanttobeamaster/gridbase/pkg/storage"
"github.com/fagongzi/log"
"github.com/fagongzi/util/protoc"
"github.com/fagongzi/util/task"
"github.com/pkg/errors"
)
const (
// When we create a region peer, we should initialize its log term/index > 0,
// so that we can force the follower peer to sync the snapshot first.
raftInitLogTerm = 5
raftInitLogIndex = 5
maxSnapTryCnt = 5
)
type snapshotState int
var (
relax = snapshotState(1)
generating = snapshotState(2)
applying = snapshotState(3)
applyAborted = snapshotState(4)
)
const (
pending = iota
running
cancelling
cancelled
finished
failed
)
type peerStorage struct {
store *Store
cell metapb.Cell
lastTerm uint64
appliedIndexTerm uint64
lastReadyIndex uint64
lastCompactIndex uint64
raftState mraft.RaftLocalState
applyState mraft.RaftApplyState
snapTriedCnt int
genSnapJob *task.Job
applySnapJob *task.Job
applySnapJobLock sync.RWMutex
pendingReads *readIndexQueue
}
func newPeerStorage(store *Store, cell metapb.Cell) (*peerStorage, error) {
s := new(peerStorage)
s.store = store
s.cell = cell
s.appliedIndexTerm = raftInitLogTerm
err := s.initRaftState()
if err != nil {
return nil, err
}
log.Infof("raftstore[cell-%d]: init raft state, state=<%+v>",
cell.ID,
s.raftState)
err = s.initApplyState()
if err != nil {
return nil, err
}
log.Infof("raftstore[cell-%d]: init apply state, state=<%+v>",
cell.ID,
s.applyState)
err = s.initLastTerm()
if err != nil {
return nil, err
}
log.Infof("raftstore[cell-%d]: init last term, last term=<%d>",
cell.ID,
s.lastTerm)
s.lastReadyIndex = s.getAppliedIndex()
s.pendingReads = new(readIndexQueue)
return s, nil
}
func (ps *peerStorage) initRaftState() error {
v, err := ps.store.getEngine(ps.cell.ID).Get(getRaftStateKey(ps.cell.ID))
if err != nil {
return errors.Wrap(err, "")
}
if len(v) > 0 {
s := &mraft.RaftLocalState{}
err = s.Unmarshal(v)
if err != nil {
return errors.Wrap(err, "")
}
ps.raftState = *s
return nil
}
s := &mraft.RaftLocalState{}
if len(ps.getCell().Peers) > 0 {
s.LastIndex = raftInitLogIndex
}
ps.raftState = *s
return nil
}
func (ps *peerStorage) initApplyState() error {
v, err := ps.store.getEngine(ps.cell.ID).Get(getApplyStateKey(ps.cell.ID))
if err != nil {
return errors.Wrap(err, "")
}
if len(v) > 0 && len(ps.getCell().Peers) > 0 {
s := &mraft.RaftApplyState{}
err = s.Unmarshal(v)
if err != nil {
return errors.Wrap(err, "")
}
ps.applyState = *s
return nil
}
if len(ps.getCell().Peers) > 0 {
ps.applyState.AppliedIndex = raftInitLogIndex
ps.applyState.TruncatedState.Index = raftInitLogIndex
ps.applyState.TruncatedState.Term = raftInitLogTerm
}
return nil
}
func (ps *peerStorage) initLastTerm() error {
lastIndex := ps.raftState.LastIndex
if lastIndex == 0 {
ps.lastTerm = lastIndex
return nil
} else if lastIndex == raftInitLogIndex {
ps.lastTerm = raftInitLogTerm
return nil
} else if lastIndex == ps.applyState.TruncatedState.Index {
ps.lastTerm = ps.applyState.TruncatedState.Term
return nil
} else if lastIndex < raftInitLogIndex {
log.Fatalf("raftstore[cell-%d]: error raft last index, index=<%d>",
ps.getCell().ID,
lastIndex)
return nil
}
v, err := ps.store.getEngine(ps.cell.ID).Get(getRaftLogKey(ps.cell.ID, lastIndex))
if err != nil {
return errors.Wrap(err, "")
}
if nil == v {
return fmt.Errorf("raftstore[cell-%d]: entry at index<%d> doesn't exist, may lose data",
ps.getCell().ID,
lastIndex)
}
s := &raftpb.Entry{}
err = s.Unmarshal(v)
if err != nil {
return errors.Wrap(err, "")
}
ps.lastTerm = s.Term
return nil
}
func (ps *peerStorage) isApplyComplete() bool {
return ps.getCommittedIndex() == ps.getAppliedIndex()
}
func (ps *peerStorage) setApplyState(applyState *mraft.RaftApplyState) {
ps.applyState = *applyState
}
func (ps *peerStorage) getApplyState() *mraft.RaftApplyState {
return &ps.applyState
}
func (ps *peerStorage) getAppliedIndex() uint64 {
return ps.getApplyState().AppliedIndex
}
func (ps *peerStorage) getCommittedIndex() uint64 {
return ps.raftState.HardState.Commit
}
func (ps *peerStorage) getTruncatedIndex() uint64 {
return ps.getApplyState().TruncatedState.Index
}
func (ps *peerStorage) getTruncatedTerm() uint64 {
return ps.getApplyState().TruncatedState.Term
}
func (ps *peerStorage) getAppliedIndexTerm() uint64 {
return atomic.LoadUint64(&ps.appliedIndexTerm)
}
func (ps *peerStorage) setAppliedIndexTerm(appliedIndexTerm uint64) {
atomic.StoreUint64(&ps.appliedIndexTerm, appliedIndexTerm)
}
func (ps *peerStorage) validateSnap(snap *raftpb.Snapshot) bool {
idx := snap.Metadata.Index
if idx < ps.getTruncatedIndex() {
// stale snapshot, should generate again.
log.Infof("raftstore[cell-%d]: snapshot is stale, generate again, snapIndex=<%d> currIndex=<%d>",
ps.getCell().ID,
idx,
ps.getTruncatedIndex())
return false
}
snapData := &mraft.SnapshotMessage{}
err := snapData.Unmarshal(snap.Data)
if err != nil {
log.Errorf("raftstore[cell-%d]: decode snapshot fail, errors:\n %+v",
ps.getCell().ID,
err)
return false
}
snapEpoch := snapData.Header.Cell.Epoch
lastEpoch := ps.getCell().Epoch
if snapEpoch.ConfVer < lastEpoch.ConfVer {
log.Infof("raftstore[cell-%d]: snapshot epoch stale, generate again. snap=<%s> curr=<%s>",
ps.getCell().ID,
snapEpoch.String(),
lastEpoch.String())
return false
}
return true
}
func (ps *peerStorage) isInitialized() bool {
return len(ps.getCell().Peers) != 0
}
func (ps *peerStorage) isApplyingSnapshot() bool {
return ps.applySnapJob != nil && ps.applySnapJob.IsNotComplete()
}
func (ps *peerStorage) getCell() metapb.Cell {
return ps.cell
}
func (ps *peerStorage) setCell(cell metapb.Cell) {
ps.cell = cell
}
func (ps *peerStorage) checkRange(low, high uint64) error {
if low > high {
return fmt.Errorf("raftstore[cell-%d]: low is greater that high, low=<%d> high=<%d>",
ps.getCell().ID,
low,
high)
} else if low <= ps.getTruncatedIndex() {
return raft.ErrCompacted
} else {
i, err := ps.LastIndex()
if err != nil {
return err
}
if high > i+1 {
return fmt.Errorf("raftstore[cell-%d]: entries' high is out of bound lastindex, hight=<%d> lastindex=<%d>",
ps.getCell().ID,
high,
i)
}
}
return nil
}
func (ps *peerStorage) loadLogEntry(index uint64) (*raftpb.Entry, error) {
key := getRaftLogKey(ps.cell.ID, index)
v, err := ps.store.getEngine(ps.cell.ID).Get(key)
if err != nil {
log.Errorf("raftstore[cell-%d]: load entry failure, index=<%d> errors:\n %+v",
ps.getCell().ID,
index,
err)
return nil, err
} else if len(v) == 0 {
log.Errorf("raftstore[cell-%d]: entry not found, index=<%d>",
ps.getCell().ID,
index)
return nil, fmt.Errorf("log entry at <%d> not found", index)
}
return ps.unmarshal(v, index)
}
func (ps *peerStorage) loadCellLocalState(job *task.Job) (*mraft.CellLocalState, error) {
if nil != job &&
job.IsCancelling() {
return nil, task.ErrJobCancelled
}
return loadCellLocalState(ps.cell.ID, ps.store.getDriver(ps.cell.ID), false)
}
func (ps *peerStorage) applySnapshot(job *task.Job) error {
if nil != job &&
job.IsCancelling() {
return task.ErrJobCancelled
}
snap := &mraft.SnapshotMessage{}
snap.Header = mraft.SnapshotMessageHeader{
Cell: ps.getCell(),
Term: ps.applyState.TruncatedState.Term,
Index: ps.applyState.TruncatedState.Index,
}
return ps.store.snapshotManager.Apply(snap)
}
func (ps *peerStorage) loadApplyState() (*mraft.RaftApplyState, error) {
key := getApplyStateKey(ps.cell.ID)
v, err := ps.store.getEngine(ps.cell.ID).Get(key)
if err != nil {
log.Errorf("raftstore[cell-%d]: load apply state failed, errors:\n %+v",
ps.getCell().ID,
err)
return nil, err
}
if len(v) == 0 {
return nil, errors.New("cell apply state not found")
}
applyState := &mraft.RaftApplyState{}
err = applyState.Unmarshal(v)
return applyState, err
}
func (ps *peerStorage) unmarshal(v []byte, expectIndex uint64) (*raftpb.Entry, error) {
e := acquireEntry()
if err := e.Unmarshal(v); err != nil {
log.Errorf("raftstore[cell-%d]: unmarshal entry failure, index=<%d>, v=<%+v> errors:\n %+v",
ps.getCell().ID,
expectIndex,
v,
err)
releaseEntry(e)
return nil, err
}
if e.Index != expectIndex {
log.Fatalf("raftstore[cell-%d]: raft log index not match, logIndex=<%d> expect=<%d>",
ps.getCell().ID,
e.Index,
expectIndex)
}
return e, nil
}
/// Delete all data belong to the region.
/// If return Err, data may get partial deleted.
func (ps *peerStorage) clearData() error {
cell := ps.getCell()
cellID := cell.ID
startKey := encStartKey(&cell)
endKey := encEndKey(&cell)
err := ps.store.addSnapJob(func() error {
log.Infof("raftstore-destroy[cell-%d]: deleting data, start=<%v> end=<%v>",
cellID,
startKey,
endKey)
err := ps.deleteAllInRange(startKey, endKey, nil)
if err != nil {
log.Errorf("raftstore-destroy[cell-%d]: failed to delete data, start=<%v> end=<%v> errors:\n %+v",
cellID,
startKey,
endKey,
err)
}
return err
}, nil)
return err
}
// Delete all data that is not covered by `newCell`.
func (ps *peerStorage) clearExtraData(newCell metapb.Cell) error {
cell := ps.getCell()
oldStartKey := encStartKey(&cell)
oldEndKey := encEndKey(&cell)
newStartKey := encStartKey(&newCell)
newEndKey := encEndKey(&newCell)
if bytes.Compare(oldStartKey, newStartKey) < 0 {
err := ps.startDestroyDataJob(newCell.ID, oldStartKey, newStartKey)
if err != nil {
return err
}
}
if bytes.Compare(newEndKey, oldEndKey) < 0 {
err := ps.startDestroyDataJob(newCell.ID, newEndKey, oldEndKey)
if err != nil {
return err
}
}
return nil
}
func (ps *peerStorage) updatePeerState(cell metapb.Cell, state mraft.PeerState, wb storage.WriteBatch) error {
cellState := &mraft.CellLocalState{}
cellState.State = state
cellState.Cell = cell
data, _ := cellState.Marshal()
if wb != nil {
return wb.Set(getCellStateKey(cell.ID), data)
}
return ps.store.getEngine(cell.ID).Set(getCellStateKey(cell.ID), data)
}
func (ps *peerStorage) writeInitialState(cellID uint64, wb storage.WriteBatch) error {
raftState := new(mraft.RaftLocalState)
raftState.LastIndex = raftInitLogIndex
raftState.HardState.Term = raftInitLogTerm
raftState.HardState.Commit = raftInitLogIndex
applyState := new(mraft.RaftApplyState)
applyState.AppliedIndex = raftInitLogIndex
applyState.TruncatedState.Index = raftInitLogIndex
applyState.TruncatedState.Term = raftInitLogTerm
err := wb.Set(getRaftStateKey(cellID), protoc.MustMarshal(raftState))
if err != nil {
return err
}
return wb.Set(getApplyStateKey(cellID), protoc.MustMarshal(applyState))
}
func (ps *peerStorage) deleteAllInRange(start, end []byte, job *task.Job) error {
if job != nil &&
job.IsCancelling() {
return task.ErrJobCancelled
}
return ps.store.getDataEngine(ps.cell.ID).RangeDelete(start, end)
}
func compactRaftLog(cellID uint64, state *mraft.RaftApplyState, compactIndex, compactTerm uint64) error {
log.Debugf("raftstore-compact[cell-%d]: compact log entries to index, index=<%d>",
cellID,
compactIndex)
if compactIndex <= state.TruncatedState.Index {
return errors.New("try to truncate compacted entries")
} else if compactIndex > state.AppliedIndex {
return fmt.Errorf("compact index %d > applied index %d", compactIndex, state.AppliedIndex)
}
// we don't actually delete the logs now, we add an async task to do it.
state.TruncatedState.Index = compactIndex
state.TruncatedState.Term = compactTerm
return nil
}
func loadCellLocalState(cellID uint64, driver storage.Driver, allowNotFound bool) (*mraft.CellLocalState, error) {
key := getCellStateKey(cellID)
v, err := driver.GetEngine().Get(key)
if err != nil {
log.Errorf("raftstore[cell-%d]: load raft state failed, errors:\n %+v",
cellID,
err)
return nil, err
} else if len(v) == 0 {
if allowNotFound {
return nil, nil
}
return nil, errors.New("cell state not found")
}
stat := &mraft.CellLocalState{}
err = stat.Unmarshal(v)
return stat, err
}
1
https://gitee.com/wanttobeamaster/gridbase.git
git@gitee.com:wanttobeamaster/gridbase.git
wanttobeamaster
gridbase
gridbase
a9a2a47d54bb

搜索帮助