2 Star 2 Fork 1

cockroachdb/cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
store.go 150.45 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"bytes"
"fmt"
"io"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/google/btree"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/time/rate"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
const (
// rangeIDAllocCount is the number of Range IDs to allocate per allocation.
rangeIDAllocCount = 10
defaultHeartbeatIntervalTicks = 5
// ttlStoreGossip is time-to-live for store-related info.
ttlStoreGossip = 2 * time.Minute
// preemptiveSnapshotRaftGroupID is a bogus ID for which a Raft group is
// temporarily created during the application of a preemptive snapshot.
preemptiveSnapshotRaftGroupID = math.MaxUint64
// defaultRaftEntryCacheSize is the default size in bytes for a
// store's Raft log entry cache.
defaultRaftEntryCacheSize = 1 << 24 // 16M
// replicaRequestQueueSize specifies the maximum number of requests to queue
// for a replica.
replicaRequestQueueSize = 100
defaultGossipWhenCapacityDeltaExceedsFraction = 0.01
// systemDataGossipInterval is the interval at which range lease
// holders verify that the most recent system data is gossiped.
// This ensures that system data is always eventually gossiped, even
// if a range lease holder experiences a failure causing a missed
// gossip update.
systemDataGossipInterval = 1 * time.Minute
// prohibitRebalancesBehindThreshold is the maximum number of log entries a
// store allows its replicas to be behind before it starts declining incoming
// rebalances. We prohibit rebalances in this situation to avoid adding
// additional work to a store that is either not keeping up or is undergoing
// recovery because it is on a recently restarted node.
prohibitRebalancesBehindThreshold = 1000
// Messages that provide detail about why a preemptive snapshot was rejected.
rebalancesDisabledMsg = "rebalances disabled because node is behind"
snapshotApplySemBusyMsg = "store busy applying snapshots and/or removing replicas"
storeDrainingMsg = "store is draining"
// IntersectingSnapshotMsg is part of the error message returned from
// canApplySnapshotLocked and is exposed here so testing can rely on it.
IntersectingSnapshotMsg = "snapshot intersects existing range"
)
var changeTypeInternalToRaft = map[roachpb.ReplicaChangeType]raftpb.ConfChangeType{
roachpb.ADD_REPLICA: raftpb.ConfChangeAddNode,
roachpb.REMOVE_REPLICA: raftpb.ConfChangeRemoveNode,
}
var storeSchedulerConcurrency = envutil.EnvOrDefaultInt(
"COCKROACH_SCHEDULER_CONCURRENCY", 8*runtime.NumCPU())
var enablePreVote = envutil.EnvOrDefaultBool(
"COCKROACH_ENABLE_PREVOTE", false)
// TestStoreConfig has some fields initialized with values relevant in tests.
func TestStoreConfig(clock *hlc.Clock) StoreConfig {
if clock == nil {
clock = hlc.NewClock(hlc.UnixNano, time.Nanosecond)
}
st := cluster.MakeTestingClusterSettings()
sc := StoreConfig{
Settings: st,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Clock: clock,
CoalescedHeartbeatsInterval: 50 * time.Millisecond,
RaftHeartbeatIntervalTicks: 1,
ScanInterval: 10 * time.Minute,
MetricsSampleInterval: metric.TestSampleInterval,
HistogramWindowInterval: metric.TestSampleInterval,
EnableEpochRangeLeases: true,
}
sc.RaftElectionTimeoutTicks = 3
sc.RaftTickInterval = 100 * time.Millisecond
sc.SetDefaults()
return sc
}
var (
raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024)
raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)
)
func newRaftConfig(
strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger,
) *raft.Config {
return &raft.Config{
ID: id,
Applied: appliedIndex,
ElectionTick: storeCfg.RaftElectionTimeoutTicks,
HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks,
Storage: strg,
Logger: logger,
// TODO(bdarnell): PreVote and CheckQuorum are two ways of
// achieving the same thing. PreVote is more compatible with
// quiesced ranges, so we want to switch to it once we've worked
// out the bugs.
PreVote: enablePreVote,
CheckQuorum: !enablePreVote,
// MaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
MaxSizePerMsg: uint64(raftMaxSizePerMsg),
// MaxInflightMsgs controls how many "inflight" messages Raft will send to
// a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and MaxSizePerMsg. The current
// settings provide for up to 1 MB of raft log to be sent without
// acknowledgement. With an average entry size of 1 KB that translates to
// ~1024 commands that might be executed in the handling of a single
// raft.Ready operation.
MaxInflightMsgs: raftMaxInflightMsgs,
}
}
// verifyKeys verifies keys. If checkEndKey is true, then the end key
// is verified to be non-nil and greater than start key. If
// checkEndKey is false, end key is verified to be nil. Additionally,
// verifies that start key is less than KeyMax and end key is less
// than or equal to KeyMax. It also verifies that a key range that
// contains range-local keys is completely range-local.
func verifyKeys(start, end roachpb.Key, checkEndKey bool) error {
if bytes.Compare(start, roachpb.KeyMax) >= 0 {
return errors.Errorf("start key %q must be less than KeyMax", start)
}
if !checkEndKey {
if len(end) != 0 {
return errors.Errorf("end key %q should not be specified for this operation", end)
}
return nil
}
if end == nil {
return errors.Errorf("end key must be specified")
}
if bytes.Compare(roachpb.KeyMax, end) < 0 {
return errors.Errorf("end key %q must be less than or equal to KeyMax", end)
}
{
sAddr, err := keys.Addr(start)
if err != nil {
return err
}
eAddr, err := keys.Addr(end)
if err != nil {
return err
}
if !sAddr.Less(eAddr) {
return errors.Errorf("end key %q must be greater than start %q", end, start)
}
if !bytes.Equal(sAddr, start) {
if bytes.Equal(eAddr, end) {
return errors.Errorf("start key is range-local, but end key is not")
}
} else if bytes.Compare(start, keys.LocalMax) < 0 {
// It's a range op, not local but somehow plows through local data -
// not cool.
return errors.Errorf("start key in [%q,%q) must be greater than LocalMax", start, end)
}
}
return nil
}
// rangeKeyItem is a common interface for roachpb.Key and Range.
type rangeKeyItem interface {
endKey() roachpb.RKey
}
// rangeBTreeKey is a type alias of roachpb.RKey that implements the
// rangeKeyItem interface and the btree.Item interface.
type rangeBTreeKey roachpb.RKey
var _ rangeKeyItem = rangeBTreeKey{}
func (k rangeBTreeKey) endKey() roachpb.RKey {
return (roachpb.RKey)(k)
}
var _ btree.Item = rangeBTreeKey{}
func (k rangeBTreeKey) Less(i btree.Item) bool {
return k.endKey().Less(i.(rangeKeyItem).endKey())
}
// A NotBootstrappedError indicates that an engine has not yet been
// bootstrapped due to a store identifier not being present.
type NotBootstrappedError struct{}
// Error formats error.
func (e *NotBootstrappedError) Error() string {
return "store has not been bootstrapped"
}
// A storeReplicaVisitor calls a visitor function for each of a store's
// initialized Replicas (in unspecified order).
type storeReplicaVisitor struct {
store *Store
repls []*Replica // Replicas to be visited.
visited int // Number of visited ranges, -1 before first call to Visit()
}
// Len implements shuffle.Interface.
func (rs storeReplicaVisitor) Len() int { return len(rs.repls) }
// Swap implements shuffle.Interface.
func (rs storeReplicaVisitor) Swap(i, j int) { rs.repls[i], rs.repls[j] = rs.repls[j], rs.repls[i] }
// newStoreReplicaVisitor constructs a storeReplicaVisitor.
func newStoreReplicaVisitor(store *Store) *storeReplicaVisitor {
return &storeReplicaVisitor{
store: store,
visited: -1,
}
}
// Visit calls the visitor with each Replica until false is returned.
func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) {
// Copy the range IDs to a slice so that we iterate over some (possibly
// stale) view of all Replicas without holding the Store lock. In particular,
// no locks are acquired during the copy process.
rs.repls = nil
rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
rs.repls = append(rs.repls, (*Replica)(v))
return true
})
// The Replicas are already in "unspecified order" due to map iteration,
// but we want to make sure it's completely random to prevent issues in
// tests where stores are scanning replicas in lock-step and one store is
// winning the race and getting a first crack at processing the replicas on
// its queues.
//
// TODO(peter): Re-evaluate whether this is necessary after we allow
// rebalancing away from the leaseholder. See TestRebalance_3To5Small.
shuffle.Shuffle(rs)
rs.visited = 0
for _, repl := range rs.repls {
// TODO(tschottdorf): let the visitor figure out if something's been
// destroyed once we return errors from mutexes (#9190). After all, it
// can still happen with this code.
rs.visited++
repl.mu.RLock()
destroyed := repl.mu.destroyed
initialized := repl.isInitializedRLocked()
repl.mu.RUnlock()
if initialized && destroyed == nil && !visitor(repl) {
break
}
}
rs.visited = 0
}
// EstimatedCount returns an estimated count of the underlying store's
// replicas.
//
// TODO(tschottdorf): this method has highly doubtful semantics.
func (rs *storeReplicaVisitor) EstimatedCount() int {
if rs.visited <= 0 {
return rs.store.ReplicaCount()
}
return len(rs.repls) - rs.visited
}
type raftRequestInfo struct {
req *RaftMessageRequest
respStream RaftMessageResponseStream
}
type raftRequestQueue struct {
syncutil.Mutex
infos []raftRequestInfo
}
// A Store maintains a map of ranges by start key. A Store corresponds
// to one physical device.
type Store struct {
Ident roachpb.StoreIdent
cfg StoreConfig
db *client.DB
engine engine.Engine // The underlying key-value store
allocator Allocator // Makes allocation decisions
rangeIDAlloc *idAllocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
splitQueue *splitQueue // Range splitting queue
replicateQueue *replicateQueue // Replication queue
replicaGCQueue *replicaGCQueue // Replica GC queue
raftLogQueue *raftLogQueue // Raft log truncation queue
raftSnapshotQueue *raftSnapshotQueue // Raft repair queue
tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue
scanner *replicaScanner // Replica scanner
consistencyQueue *consistencyQueue // Replica consistency check queue
metrics *StoreMetrics
intentResolver *intentResolver
raftEntryCache *raftEntryCache
// gossipRangeCountdown and leaseRangeCountdown are countdowns of
// changes to range and leaseholder counts, after which the store
// descriptor will be re-gossiped earlier than the normal periodic
// gossip interval. Updated atomically.
gossipRangeCountdown int32
gossipLeaseCountdown int32
// gossipWritesPerSecondVal serves a similar purpose, but simply records
// the most recently gossiped value so that we can tell if a newly measured
// value differs by enough to justify re-gossiping the store.
gossipWritesPerSecondVal syncutil.AtomicFloat64
coalescedMu struct {
syncutil.Mutex
heartbeats map[roachpb.StoreIdent][]RaftHeartbeat
heartbeatResponses map[roachpb.StoreIdent][]RaftHeartbeat
}
// 1 if the store was started, 0 if it wasn't. To be accessed using atomic
// ops.
started int32
stopper *stop.Stopper
// The time when the store was Start()ed, in nanos.
startedAt int64
nodeDesc *roachpb.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks
idleReplicaElectionTime struct {
syncutil.Mutex
at time.Time
}
// Semaphore to limit concurrent non-empty snapshot application and replica
// data destruction.
snapshotApplySem chan struct{}
// Are rebalances to this store allowed or prohibited. Rebalances are
// prohibited while a store is catching up replicas (i.e. recovering) after
// being restarted.
rebalancesDisabled int32
// draining holds a bool which indicates whether this store is draining. See
// SetDraining() for a more detailed explanation of behavior changes.
//
// TODO(bdarnell,tschottdorf): Would look better inside of `mu`, which at
// the time of its creation was riddled with deadlock (but that situation
// has likely improved).
draining atomic.Value
// Locking notes: To avoid deadlocks, the following lock order must be
// obeyed: Replica.raftMu < Replica.readOnlyCmdMu < Store.mu < Replica.mu
// < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu.
// (It is not required to acquire every lock in sequence, but when multiple
// locks are held at the same time, it is incorrect to acquire a lock with
// "lesser" value in this sequence after one with "greater" value).
//
// Methods of Store with a "Locked" suffix require that
// Store.mu.Mutex be held. Other locking requirements are indicated
// in comments.
//
// The locking structure here is complex because A) Store is a
// container of Replicas, so it must generally be consulted before
// doing anything with any Replica, B) some Replica operations
// (including splits) modify the Store. Therefore we generally lock
// Store.mu to find a Replica, release it, then call a method on the
// Replica. These short-lived locks of Store.mu and Replica.mu are
// often surrounded by a long-lived lock of Replica.raftMu as
// described below.
//
// There are two major entry points to this stack of locks:
// Store.Send (which handles incoming RPCs) and raft-related message
// processing (including handleRaftReady on the processRaft
// goroutine and HandleRaftRequest on GRPC goroutines). Reads are
// processed solely through Store.Send; writes start out on
// Store.Send until they propose their raft command and then they
// finish on the raft goroutines.
//
// TODO(bdarnell): a Replica could be destroyed immediately after
// Store.Send finds the Replica and releases the lock. We need
// another RWMutex to be held by anything using a Replica to ensure
// that everything is finished before releasing it. #7169
//
// Detailed description of the locks:
//
// * Replica.raftMu: Held while any raft messages are being processed
// (including handleRaftReady and HandleRaftRequest) or while the set of
// Replicas in the Store is being changed (which may happen outside of raft
// via the replica GC queue).
//
// * Replica.readOnlyCmdMu (RWMutex): Held in read mode while any
// read-only command is in progress on the replica; held in write
// mode while executing a commit trigger. This is necessary
// because read-only commands mutate the Replica's timestamp cache
// (while holding Replica.mu in addition to readOnlyCmdMu). The
// RWMutex ensures that no reads are being executed during a split
// (which copies the timestamp cache) while still allowing
// multiple reads in parallel (#3148). TODO(bdarnell): this lock
// only needs to be held during splitTrigger, not all triggers.
//
// * Store.mu: Protects the Store's map of its Replicas. Acquired and
// released briefly at the start of each request; metadata operations like
// splits acquire it again to update the map. Even though these lock
// acquisitions do not make up a single critical section, it is safe thanks
// to Replica.raftMu which prevents any concurrent modifications.
//
// * Replica.mu: Protects the Replica's in-memory state. Acquired
// and released briefly as needed (note that while the lock is
// held "briefly" in that it is not held for an entire request, we
// do sometimes do I/O while holding the lock, as in
// Replica.Entries). This lock should be held when calling any
// methods on the raft group. Raft may call back into the Replica
// via the methods of the raft.Storage interface, which assume the
// lock is held even though they do not follow our convention of
// the "Locked" suffix.
//
// * Store.scheduler.mu: Protects the Raft scheduler internal
// state. Callbacks from the scheduler are performed while not holding this
// mutex in order to observe the above ordering constraints.
//
// Splits (and merges, but they're not finished and so will not be discussed
// here) deserve special consideration: they operate on two ranges. Naively,
// this is fine because the right-hand range is brand new, but an
// uninitialized version may have been created by a raft message before we
// process the split (see commentary on Replica.splitTrigger). We make this
// safe by locking the right-hand range for the duration of the Raft command
// containing the split/merge trigger.
//
// Note that because we acquire and release Store.mu and Replica.mu
// repeatedly rather than holding a lock for an entire request, we are
// actually relying on higher-level locks to ensure that things don't change
// out from under us. In particular, handleRaftReady accesses the replicaID
// more than once, and we rely on Replica.raftMu to ensure that this is not
// modified by a concurrent HandleRaftRequest. (#4476)
mu struct {
syncutil.RWMutex
// Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This
// includes `uninitReplicas`. May be read without holding Store.mu.
replicas syncutil.IntMap
// A btree key containing objects of type *Replica or
// *ReplicaPlaceholder (both of which have an associated key range, on
// the EndKey of which the btree is keyed)
replicasByKey *btree.BTree
uninitReplicas map[roachpb.RangeID]*Replica // Map of uninitialized replicas by Range ID
// replicaPlaceholders is a map to access all placeholders, so they can
// be directly accessed and cleared after stepping all raft groups.
replicaPlaceholders map[roachpb.RangeID]*ReplicaPlaceholder
}
// replicaQueues is a map of per-Replica incoming request queues. These
// queues might more naturally belong in Replica, but are kept separate to
// avoid reworking the locking in getOrCreateReplica which requires
// Replica.raftMu to be held while a replica is being inserted into
// Store.mu.replicas.
replicaQueues syncutil.IntMap // map[roachpb.RangeID]*raftRequestQueue
tsCacheMu struct {
// Protects all fields in the tsCacheMu struct.
syncutil.Mutex
// Most recent timestamps for keys / key ranges.
cache *timestampCache
}
scheduler *raftScheduler
counts struct {
// Number of placeholders removed due to error.
removedPlaceholders int32
// Number of placeholders successfully filled by a snapshot.
filledPlaceholders int32
// Number of placeholders removed due to a snapshot that was dropped by
// raft.
droppedPlaceholders int32
}
}
var _ client.Sender = &Store{}
// A StoreConfig encompasses the auxiliary objects and configuration
// required to create a store.
// All fields holding a pointer or an interface are required to create
// a store; the rest will have sane defaults set if omitted.
type StoreConfig struct {
AmbientCtx log.AmbientContext
base.RaftConfig
Settings *cluster.Settings
Clock *hlc.Clock
DB *client.DB
Gossip *gossip.Gossip
NodeLiveness *NodeLiveness
StorePool *StorePool
Transport *RaftTransport
RPCContext *rpc.Context
// SQLExecutor is used by the store to execute SQL statements in a way that
// is more direct than using a sql.Executor.
SQLExecutor sqlutil.InternalExecutor
// TimeSeriesDataStore is an interface used by the store's time series
// maintenance queue to dispatch individual maintenance tasks.
TimeSeriesDataStore TimeSeriesDataStore
// DontRetryPushTxnFailures will propagate a push txn failure immediately
// instead of utilizing the push txn queue to wait for the transaction to
// finish or be pushed by a higher priority contender.
DontRetryPushTxnFailures bool
// CoalescedHeartbeatsInterval is the interval for which heartbeat messages
// are queued and then sent as a single coalesced heartbeat; it is a
// fraction of the RaftTickInterval so that heartbeats don't get delayed by
// an entire tick. Delaying coalescing heartbeat responses has a bad
// interaction with quiescence because the coalesced (delayed) heartbeat
// response can unquiesce the leader. Consider:
//
// T+0: leader queues MsgHeartbeat
// T+1: leader sends MsgHeartbeat
// follower receives MsgHeartbeat
// follower queues MsgHeartbeatResp
// T+2: leader queues quiesce message
// follower sends MsgHeartbeatResp
// leader receives MsgHeartbeatResp
// T+3: leader sends quiesce message
//
// Thus we want to make sure that heartbeats are responded to faster than
// the quiesce cadence.
CoalescedHeartbeatsInterval time.Duration
// RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats.
RaftHeartbeatIntervalTicks int
// ScanInterval is the default value for the scan interval
ScanInterval time.Duration
// ScanMaxIdleTime is the maximum time the scanner will be idle between ranges.
// If enabled (> 0), the scanner may complete in less than ScanInterval for small
// stores.
ScanMaxIdleTime time.Duration
// If LogRangeEvents is true, major changes to ranges will be logged into
// the range event log.
LogRangeEvents bool
// RaftEntryCacheSize is the size in bytes of the Raft log entry cache
// shared by all Raft groups managed by the store.
RaftEntryCacheSize uint64
// IntentResolverTaskLimit is the maximum number of asynchronous tasks that
// may be started by the intent resolver. -1 indicates no asynchronous tasks
// are allowed. 0 uses the default value (defaultIntentResolverTaskLimit)
// which is non-zero.
IntentResolverTaskLimit int
TestingKnobs StoreTestingKnobs
// concurrentSnapshotApplyLimit specifies the maximum number of empty
// snapshots and the maximum number of non-empty snapshots that are permitted
// to be applied concurrently.
concurrentSnapshotApplyLimit int
// MetricsSampleInterval is (server.Context).MetricsSampleInterval
MetricsSampleInterval time.Duration
// HistogramWindowInterval is (server.Context).HistogramWindowInterval
HistogramWindowInterval time.Duration
// EnableEpochRangeLeases controls whether epoch-based range leases are used.
EnableEpochRangeLeases bool
// GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the last
// gossiped store capacity values which need be exceeded before the store will
// gossip immediately without waiting for the periodic gossip interval.
GossipWhenCapacityDeltaExceedsFraction float64
}
// StoreTestingKnobs is a part of the context used to control parts of
// the system. The Testing*Filter functions are called at various
// points in the request pipeline if they are non-nil. These can be
// used either for synchronization (e.g. to write to a channel when a
// particular point is reached) or to change the behavior by returning
// an error (which aborts all further processing for the command).
type StoreTestingKnobs struct {
// TestingProposalFilter is called before proposing each command.
TestingProposalFilter storagebase.ReplicaCommandFilter
// TestingEvalFilter is called before evaluating each command. The
// number of times this callback is run depends on the propEvalKV
// setting, and it is therefore deprecated in favor of either
// TestingProposalFilter (which runs only on the lease holder) or
// TestingApplyFilter (which runs on each replica). If your filter is
// not idempotent, consider wrapping it in a
// ReplayProtectionFilterWrapper.
// TODO(bdarnell,tschottdorf): Migrate existing tests which use this
// to one of the other filters. See #10493
// TODO(andrei): Provide guidance on what to use instead for trapping reads.
TestingEvalFilter storagebase.ReplicaCommandFilter
// TestingApplyFilter is called before applying the results of a
// command on each replica. If it returns an error, the command will
// not be applied. If it returns an error on some replicas but not
// others, the behavior is poorly defined unless that error is a
// ReplicaCorruptionError.
TestingApplyFilter storagebase.ReplicaApplyFilter
// TestingPostApplyFilter is called after a command is applied to
// rocksdb but before in-memory side effects have been processed.
TestingPostApplyFilter storagebase.ReplicaApplyFilter
// TestingResponseFilter is called after the replica processes a
// command in order for unittests to modify the batch response,
// error returned to the client, or to simulate network failures.
TestingResponseFilter storagebase.ReplicaResponseFilter
// If non-nil, BadChecksumPanic is called by CheckConsistency() instead of
// panicking on a checksum mismatch.
BadChecksumPanic func(roachpb.StoreIdent)
// If non-nil, BadChecksumReportDiff is called by CheckConsistency() on a
// checksum mismatch to report the diff between snapshots.
BadChecksumReportDiff func(roachpb.StoreIdent, []ReplicaSnapshotDiff)
// Disables the use of one phase commits.
DisableOnePhaseCommits bool
// A hack to manipulate the clock before sending a batch request to a replica.
// TODO(kaneda): This hook is not encouraged to use. Get rid of it once
// we make TestServer take a ManualClock.
ClockBeforeSend func(*hlc.Clock, roachpb.BatchRequest)
// OnCampaign is called if the replica campaigns for Raft leadership
// when initializing the Raft group. Note that this method is invoked
// with both Replica.raftMu and Replica.mu locked.
OnCampaign func(*Replica)
// OnCommandQueueAction is called when the BatchRequest performs an action
// on the CommandQueue.
OnCommandQueueAction func(*roachpb.BatchRequest, storagebase.CommandQueueAction)
// MaxOffset, if set, overrides the server clock's MaxOffset at server
// creation time.
// See also DisableMaxOffsetCheck.
MaxOffset time.Duration
// DisableMaxOffsetCheck disables the rejection (in Store.Send) of requests
// with the timestamp too much in the future. Normally, this rejection is a
// good sanity check, but certain tests unfortunately insert a "message from
// the future" into the system to advance the clock of a TestServer. We
// should get rid of such practices once we make TestServer take a
// ManualClock.
DisableMaxOffsetCheck bool
// DontPreventUseOfOldLeaseOnStart disables the initialization of
// replica.mu.minLeaseProposedTS on replica.Init(). This has the effect of
// allowing the replica to use the lease that it had in a previous life (in
// case the tests persisted the engine used in said previous life).
DontPreventUseOfOldLeaseOnStart bool
// LeaseRequestEvent, if set, is called when replica.requestLeaseLocked() is
// called to acquire a new lease. This can be used to assert that a request
// triggers a lease acquisition.
LeaseRequestEvent func(ts hlc.Timestamp)
// LeaseTransferBlockedOnExtensionEvent, if set, is called when
// replica.TransferLease() encounters an in-progress lease extension.
// nextLeader is the replica that we're trying to transfer the lease to.
LeaseTransferBlockedOnExtensionEvent func(nextLeader roachpb.ReplicaDescriptor)
// DisableReplicaGCQueue disables the replica GC queue.
DisableReplicaGCQueue bool
// DisableReplicateQueue disables the replication queue.
DisableReplicateQueue bool
// DisableReplicaRebalancing disables rebalancing of replicas but otherwise
// leaves the replicate queue operational.
DisableReplicaRebalancing bool
// DisableSplitQueue disables the split queue.
DisableSplitQueue bool
// DisableTimeSeriesMaintenanceQueue disables the time series maintenance
// queue.
DisableTimeSeriesMaintenanceQueue bool
// DisableRaftSnapshotQueue disables the raft snapshot queue.
DisableRaftSnapshotQueue bool
// DisableScanner disables the replica scanner.
DisableScanner bool
// DisablePeriodicGossips disables periodic gossiping.
DisablePeriodicGossips bool
// DisableRefreshReasonTicks disables refreshing pending commands when a new
// leader is discovered.
DisableRefreshReasonNewLeader bool
// DisableRefreshReasonTicks disables refreshing pending commands when a
// snapshot is applied.
DisableRefreshReasonSnapshotApplied bool
// DisableRefreshReasonTicks disables refreshing pending commands
// periodically.
DisableRefreshReasonTicks bool
// DisableProcessRaft disables the process raft loop.
DisableProcessRaft bool
// DisableLastProcessedCheck disables checking on replica queue last processed times.
DisableLastProcessedCheck bool
// ReplicateQueueAcceptsUnsplit allows the replication queue to
// process ranges that need to be split, for use in tests that use
// the replication queue but disable the split queue.
ReplicateQueueAcceptsUnsplit bool
// NumKeysEvaluatedForRangeIntentResolution is set by the stores to the
// number of keys evaluated for range intent resolution.
NumKeysEvaluatedForRangeIntentResolution *int64
// SkipMinSizeCheck, if set, makes the store creation process skip the check
// for a minimum size.
SkipMinSizeCheck bool
// DisableAsyncIntentResolution disables the async intent resolution
// path (but leaves synchronous resolution). This can avoid some
// edge cases in tests that start and stop servers.
DisableAsyncIntentResolution bool
// DisableLeaseCapacityGossip disables the ability of a changing number of
// leases to trigger the store to gossip its capacity. With this enabled,
// only changes in the number of replicas can cause the store to gossip its
// capacity.
DisableLeaseCapacityGossip bool
// BootstrapVersion overrides the version the stores will be bootstrapped with.
BootstrapVersion *cluster.ClusterVersion
}
var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*StoreTestingKnobs) ModuleTestingKnobs() {}
// Valid returns true if the StoreConfig is populated correctly.
// We don't check for Gossip and DB since some of our tests pass
// that as nil.
func (sc *StoreConfig) Valid() bool {
return sc.Clock != nil && sc.Transport != nil &&
sc.RaftTickInterval != 0 && sc.RaftHeartbeatIntervalTicks > 0 &&
sc.RaftElectionTimeoutTicks > 0 && sc.ScanInterval >= 0 &&
sc.AmbientCtx.Tracer != nil
}
// SetDefaults initializes unset fields in StoreConfig to values
// suitable for use on a local network.
// TODO(tschottdorf): see if this ought to be configurable via flags.
func (sc *StoreConfig) SetDefaults() {
sc.RaftConfig.SetDefaults()
if sc.CoalescedHeartbeatsInterval == 0 {
sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 2
}
if sc.RaftHeartbeatIntervalTicks == 0 {
sc.RaftHeartbeatIntervalTicks = defaultHeartbeatIntervalTicks
}
if sc.RaftEntryCacheSize == 0 {
sc.RaftEntryCacheSize = defaultRaftEntryCacheSize
}
if sc.IntentResolverTaskLimit == 0 {
sc.IntentResolverTaskLimit = defaultIntentResolverTaskLimit
} else if sc.IntentResolverTaskLimit == -1 {
sc.IntentResolverTaskLimit = 0
}
if sc.concurrentSnapshotApplyLimit == 0 {
// NB: setting this value higher than 1 is likely to degrade client
// throughput.
sc.concurrentSnapshotApplyLimit =
envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_APPLY_LIMIT", 1)
}
if sc.GossipWhenCapacityDeltaExceedsFraction == 0 {
sc.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction
}
}
// LeaseExpiration returns an int64 to increment a manual clock with to
// make sure that all active range leases expire.
func (sc *StoreConfig) LeaseExpiration() int64 {
// Due to lease extensions, the remaining interval can be longer than just
// the sum of the offset (=length of stasis period) and the active
// duration, but definitely not by 2x.
maxOffset := sc.Clock.MaxOffset()
if maxOffset == timeutil.ClocklessMaxOffset {
// Don't do shady math on clockless reads.
maxOffset = 0
}
return 2 * (sc.RangeLeaseActiveDuration() + maxOffset).Nanoseconds()
}
// NewStore returns a new instance of a store.
func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescriptor) *Store {
// TODO(tschottdorf): find better place to set these defaults.
cfg.SetDefaults()
if !cfg.Valid() {
log.Fatalf(context.Background(), "invalid store configuration: %+v", &cfg)
}
s := &Store{
cfg: cfg,
db: cfg.DB, // TODO(tschottdorf): remove redundancy.
engine: eng,
nodeDesc: nodeDesc,
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
}
if cfg.RPCContext != nil {
s.allocator = MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency)
} else {
s.allocator = MakeAllocator(cfg.StorePool, func(string) (time.Duration, bool) {
return 0, false
})
}
s.intentResolver = newIntentResolver(s, cfg.IntentResolverTaskLimit)
s.raftEntryCache = newRaftEntryCache(cfg.RaftEntryCacheSize)
s.draining.Store(false)
s.scheduler = newRaftScheduler(s.cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency)
s.coalescedMu.Lock()
s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]RaftHeartbeat{}
s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]RaftHeartbeat{}
s.coalescedMu.Unlock()
s.mu.Lock()
s.mu.replicaPlaceholders = map[roachpb.RangeID]*ReplicaPlaceholder{}
s.mu.replicasByKey = btree.New(64 /* degree */)
s.mu.uninitReplicas = map[roachpb.RangeID]*Replica{}
s.mu.Unlock()
s.tsCacheMu.Lock()
s.tsCacheMu.cache = newTimestampCache(s.cfg.Clock)
s.tsCacheMu.Unlock()
s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit)
if s.cfg.Gossip != nil {
// Add range scanner and configure with queues.
s.scanner = newReplicaScanner(
s.cfg.AmbientCtx, cfg.ScanInterval, cfg.ScanMaxIdleTime, newStoreReplicaVisitor(s),
)
s.gcQueue = newGCQueue(s, s.cfg.Gossip)
s.splitQueue = newSplitQueue(s, s.db, s.cfg.Gossip)
s.replicateQueue = newReplicateQueue(s, s.cfg.Gossip, s.allocator, s.cfg.Clock)
s.replicaGCQueue = newReplicaGCQueue(s, s.db, s.cfg.Gossip)
s.raftLogQueue = newRaftLogQueue(s, s.db, s.cfg.Gossip)
s.raftSnapshotQueue = newRaftSnapshotQueue(s, s.cfg.Gossip, s.cfg.Clock)
s.consistencyQueue = newConsistencyQueue(s, s.cfg.Gossip)
s.scanner.AddQueues(
s.gcQueue, s.splitQueue, s.replicateQueue, s.replicaGCQueue,
s.raftLogQueue, s.raftSnapshotQueue, s.consistencyQueue)
if s.cfg.TimeSeriesDataStore != nil {
s.tsMaintenanceQueue = newTimeSeriesMaintenanceQueue(
s, s.db, s.cfg.Gossip, s.cfg.TimeSeriesDataStore,
)
s.scanner.AddQueues(s.tsMaintenanceQueue)
}
}
if cfg.TestingKnobs.DisableReplicaGCQueue {
s.setReplicaGCQueueActive(false)
}
if cfg.TestingKnobs.DisableReplicateQueue {
s.setReplicateQueueActive(false)
}
if cfg.TestingKnobs.DisableSplitQueue {
s.setSplitQueueActive(false)
}
if cfg.TestingKnobs.DisableTimeSeriesMaintenanceQueue {
s.setTimeSeriesMaintenanceQueueActive(false)
}
if cfg.TestingKnobs.DisableRaftSnapshotQueue {
s.setRaftSnapshotQueueActive(false)
}
if cfg.TestingKnobs.DisableScanner {
s.setScannerActive(false)
}
return s
}
// String formats a store for debug output.
func (s *Store) String() string {
return fmt.Sprintf("[n%d,s%d]", s.Ident.NodeID, s.Ident.StoreID)
}
// ClusterSettings returns the node's ClusterSettings.
func (s *Store) ClusterSettings() *cluster.Settings {
return s.cfg.Settings
}
// AnnotateCtx is a convenience wrapper; see AmbientContext.
func (s *Store) AnnotateCtx(ctx context.Context) context.Context {
return s.cfg.AmbientCtx.AnnotateCtx(ctx)
}
// SetDraining (when called with 'true') causes incoming lease transfers to be
// rejected, prevents all of the Store's Replicas from acquiring or extending
// range leases, and attempts to transfer away any leases owned.
// When called with 'false', returns to the normal mode of operation.
func (s *Store) SetDraining(drain bool) {
s.draining.Store(drain)
if !drain {
return
}
var wg sync.WaitGroup
ctx := log.WithLogTag(context.Background(), "drain", nil)
// Limit the number of concurrent lease transfers.
sem := make(chan struct{}, 100)
sysCfg, sysCfgSet := s.cfg.Gossip.GetSystemConfig()
newStoreReplicaVisitor(s).Visit(func(r *Replica) bool {
wg.Add(1)
if err := s.stopper.RunLimitedAsyncTask(
r.AnnotateCtx(ctx), "storage.Store: draining replica", sem, true, /* wait */
func(ctx context.Context) {
defer wg.Done()
var drainingLease roachpb.Lease
for {
var leaseCh <-chan *roachpb.Error
r.mu.Lock()
lease, nextLease := r.getLeaseRLocked()
if nextLease != nil && nextLease.OwnedBy(s.StoreID()) {
leaseCh = r.mu.pendingLeaseRequest.JoinRequest()
}
r.mu.Unlock()
if leaseCh != nil {
<-leaseCh
continue
}
drainingLease = lease
break
}
if drainingLease.OwnedBy(s.StoreID()) && r.IsLeaseValid(drainingLease, s.Clock().Now()) {
desc := r.Desc()
zone := config.DefaultZoneConfig()
if sysCfgSet {
var err error
zone, err = sysCfg.GetZoneConfigForKey(desc.StartKey)
if log.V(1) && err != nil {
log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err)
}
}
if _, err := s.replicateQueue.transferLease(
ctx,
r,
desc,
zone,
transferLeaseOptions{},
); log.V(1) && err != nil {
log.Errorf(ctx, "error transferring lease when draining: %s", err)
}
}
}); err != nil {
if log.V(1) {
log.Errorf(ctx, "error running draining task: %s", err)
}
wg.Done()
return false
}
return true
})
wg.Wait()
}
// IsStarted returns true if the Store has been started.
func (s *Store) IsStarted() bool {
return atomic.LoadInt32(&s.started) == 1
}
// IterateRangeDescriptors calls the provided function with each descriptor
// from the provided Engine. The return values of this method and fn have
// semantics similar to engine.MVCCIterate.
func IterateRangeDescriptors(
ctx context.Context, eng engine.Reader, fn func(desc roachpb.RangeDescriptor) (bool, error),
) error {
log.Event(ctx, "beginning range descriptor iteration")
// Iterator over all range-local key-based data.
start := keys.RangeDescriptorKey(roachpb.RKeyMin)
end := keys.RangeDescriptorKey(roachpb.RKeyMax)
allCount := 0
matchCount := 0
bySuffix := make(map[string]int)
kvToDesc := func(kv roachpb.KeyValue) (bool, error) {
allCount++
// Only consider range metadata entries; ignore others.
_, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
return false, err
}
bySuffix[string(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return false, nil
}
var desc roachpb.RangeDescriptor
if err := kv.Value.GetProto(&desc); err != nil {
return false, err
}
matchCount++
return fn(desc)
}
_, err := engine.MVCCIterate(ctx, eng, start, end, hlc.MaxTimestamp, false /* !consistent */, nil, /* txn */
false /* !reverse */, kvToDesc)
log.Eventf(ctx, "iterated over %d keys to find %d range descriptors (by suffix: %v)",
allCount, matchCount, bySuffix)
return err
}
// ReadStoreIdent reads the StoreIdent from the store.
// It returns *NotBootstrappedError if the ident is missing (meaning that the
// store needs to be bootstrapped).
func ReadStoreIdent(ctx context.Context, eng engine.Engine) (roachpb.StoreIdent, error) {
var ident roachpb.StoreIdent
ok, err := engine.MVCCGetProto(
ctx, eng, keys.StoreIdentKey(), hlc.Timestamp{}, true, nil, &ident)
if err != nil {
return roachpb.StoreIdent{}, err
} else if !ok {
return roachpb.StoreIdent{}, &NotBootstrappedError{}
}
return ident, err
}
// Start the engine, set the GC and read the StoreIdent.
func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.stopper = stopper
// Read the store ident if not already initialized. "NodeID != 0" implies
// the store has already been initialized.
if s.Ident.NodeID == 0 {
// Read store ident and return a not-bootstrapped error if necessary.
ident, err := ReadStoreIdent(ctx, s.engine)
if err != nil {
return err
}
s.Ident = ident
}
// Set the store ID for logging.
s.cfg.AmbientCtx.AddLogTagInt("s", int(s.StoreID()))
ctx = s.AnnotateCtx(ctx)
log.Event(ctx, "read store identity")
// If the nodeID is 0, it has not be assigned yet.
if s.nodeDesc.NodeID != 0 && s.Ident.NodeID != s.nodeDesc.NodeID {
return errors.Errorf("node id:%d does not equal the one in node descriptor:%d", s.Ident.NodeID, s.nodeDesc.NodeID)
}
// Always set gossip NodeID before gossiping any info.
if s.cfg.Gossip != nil {
s.cfg.Gossip.NodeID.Set(ctx, s.Ident.NodeID)
}
// Create ID allocators.
idAlloc, err := newIDAllocator(
s.cfg.AmbientCtx, keys.RangeIDGenerator, s.db, 2 /* min ID */, rangeIDAllocCount, s.stopper,
)
if err != nil {
return err
}
s.rangeIDAlloc = idAlloc
now := s.cfg.Clock.Now()
s.startedAt = now.WallTime
// Iterate over all range descriptors, ignoring uncommitted versions
// (consistent=false). Uncommitted intents which have been abandoned
// due to a split crashing halfway will simply be resolved on the
// next split attempt. They can otherwise be ignored.
s.mu.Lock()
// TODO(peter): While we have to iterate to find the replica descriptors
// serially, we can perform the migrations and replica creation
// concurrently. Note that while we can perform this initialization
// concurrently, all of the initialization must be performed before we start
// listening for Raft messages and starting the process Raft loop.
err = IterateRangeDescriptors(ctx, s.engine,
func(desc roachpb.RangeDescriptor) (bool, error) {
if !desc.IsInitialized() {
return false, errors.Errorf("found uninitialized RangeDescriptor: %+v", desc)
}
rep, err := NewReplica(&desc, s, 0)
if err != nil {
return false, err
}
if err = s.addReplicaInternalLocked(rep); err != nil {
return false, err
}
// Add this range and its stats to our counter.
s.metrics.ReplicaCount.Inc(1)
s.metrics.addMVCCStats(rep.GetMVCCStats())
if _, ok := desc.GetReplicaDescriptor(s.StoreID()); !ok {
// We are no longer a member of the range, but we didn't GC the replica
// before shutting down. Add the replica to the GC queue.
if added, err := s.replicaGCQueue.Add(rep, replicaGCPriorityRemoved); err != nil {
log.Errorf(ctx, "%s: unable to add replica to GC queue: %s", rep, err)
} else if added {
log.Infof(ctx, "%s: added to replica GC queue", rep)
}
}
// Note that we do not create raft groups at this time; they will be created
// on-demand the first time they are needed. This helps reduce the amount of
// election-related traffic in a cold start.
// Raft initialization occurs when we propose a command on this range or
// receive a raft message addressed to it.
// TODO(bdarnell): Also initialize raft groups when read leases are needed.
// TODO(bdarnell): Scan all ranges at startup for unapplied log entries
// and initialize those groups.
return false, nil
})
s.mu.Unlock()
if err != nil {
return err
}
// Start Raft processing goroutines.
s.cfg.Transport.Listen(s.StoreID(), s)
s.processRaft(ctx)
// Gossip is only ever nil while bootstrapping a cluster and
// in unittests.
if s.cfg.Gossip != nil {
// Register update channel for any changes to the system config.
// This may trigger splits along structured boundaries,
// and update max range bytes.
gossipUpdateC := s.cfg.Gossip.RegisterSystemConfigChannel()
s.stopper.RunWorker(ctx, func(context.Context) {
for {
select {
case <-gossipUpdateC:
cfg, _ := s.cfg.Gossip.GetSystemConfig()
s.systemGossipUpdate(cfg)
case <-s.stopper.ShouldStop():
return
}
}
})
// Start a single goroutine in charge of periodically gossiping the
// sentinel and first range metadata if we have a first range.
// This may wake up ranges and requires everything to be set up and
// running.
s.startGossip()
// Start the scanner. The construction here makes sure that the scanner
// only starts after Gossip has connected, and that it does not block Start
// from returning (as doing so might prevent Gossip from ever connecting).
s.stopper.RunWorker(ctx, func(context.Context) {
select {
case <-s.cfg.Gossip.Connected:
s.scanner.Start(s.cfg.Clock, s.stopper)
case <-s.stopper.ShouldStop():
return
}
})
// Run metrics computation up front to populate initial statistics.
if err = s.ComputeMetrics(ctx, -1); err != nil {
log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err)
}
log.Event(ctx, "computed initial metrics")
}
// Set the started flag (for unittests).
atomic.StoreInt32(&s.started, 1)
return nil
}
// WaitForInit waits for any asynchronous processes begun in Start()
// to complete their initialization. In particular, this includes
// gossiping. In some cases this may block until the range GC queue
// has completed its scan. Only for testing.
func (s *Store) WaitForInit() {
s.initComplete.Wait()
}
var errPeriodicGossipsDisabled = errors.New("periodic gossip is disabled")
// startGossip runs an infinite loop in a goroutine which regularly checks
// whether the store has a first range or config replica and asks those ranges
// to gossip accordingly.
func (s *Store) startGossip() {
wakeReplica := func(ctx context.Context, repl *Replica) error {
// Acquire the range lease, which in turn triggers system data gossip
// functions (e.g. maybeGossipSystemConfig or maybeGossipNodeLiveness).
_, pErr := repl.getLeaseForGossip(ctx)
return pErr.GoError()
}
if s.cfg.TestingKnobs.DisablePeriodicGossips {
wakeReplica = func(context.Context, *Replica) error {
return errPeriodicGossipsDisabled
}
}
gossipFns := []struct {
key roachpb.Key
fn func(context.Context, *Replica) error
description string
interval time.Duration
}{
{
key: roachpb.KeyMin,
fn: func(ctx context.Context, repl *Replica) error {
// The first range is gossiped by all replicas, not just the lease
// holder, so wakeReplica is not used here.
return repl.maybeGossipFirstRange(ctx).GoError()
},
description: "first range descriptor",
interval: sentinelGossipInterval,
},
{
key: keys.SystemConfigSpan.Key,
fn: wakeReplica,
description: "system config",
interval: systemDataGossipInterval,
},
{
key: keys.NodeLivenessSpan.Key,
fn: wakeReplica,
description: "node liveness",
interval: systemDataGossipInterval,
},
}
// Periodic updates run in a goroutine and signal a WaitGroup upon completion
// of their first iteration.
s.initComplete.Add(len(gossipFns))
for _, gossipFn := range gossipFns {
gossipFn := gossipFn // per-iteration copy
s.stopper.RunWorker(context.Background(), func(ctx context.Context) {
ticker := time.NewTicker(gossipFn.interval)
defer ticker.Stop()
for first := true; ; {
// Retry in a backoff loop until gossipFn succeeds. The gossipFn might
// temporarily fail (e.g. because node liveness hasn't initialized yet
// making it impossible to get an epoch-based range lease), in which
// case we want to retry quickly.
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = s.stopper.ShouldStop()
for r := retry.Start(retryOptions); r.Next(); {
if repl := s.LookupReplica(roachpb.RKey(gossipFn.key), nil); repl != nil {
annotatedCtx := repl.AnnotateCtx(ctx)
if err := gossipFn.fn(annotatedCtx, repl); err != nil {
log.Warningf(annotatedCtx, "could not gossip %s: %s", gossipFn.description, err)
if err != errPeriodicGossipsDisabled {
continue
}
}
}
break
}
if first {
first = false
s.initComplete.Done()
}
select {
case <-ticker.C:
case <-s.stopper.ShouldStop():
return
}
}
})
}
}
// systemGossipUpdate is a callback for gossip updates to
// the system config which affect range split boundaries.
func (s *Store) systemGossipUpdate(cfg config.SystemConfig) {
// For every range, update its MaxBytes and check if it needs to be split.
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
if zone, err := cfg.GetZoneConfigForKey(repl.Desc().StartKey); err == nil {
repl.SetMaxBytes(zone.RangeMaxBytes)
}
s.splitQueue.MaybeAdd(repl, s.cfg.Clock.Now())
return true // more
})
}
// GossipStore broadcasts the store on the gossip network.
func (s *Store) GossipStore(ctx context.Context) error {
// This should always return immediately and acts as a sanity check that we
// don't try to gossip before we're connected.
select {
case <-s.cfg.Gossip.Connected:
default:
log.Fatalf(ctx, "not connected to gossip")
}
// Temporarily indicate that we're gossiping the store capacity to avoid
// recursively triggering a gossip of the store capacity.
syncutil.StoreFloat64(&s.gossipWritesPerSecondVal, -1)
storeDesc, err := s.Descriptor()
if err != nil {
return errors.Wrapf(err, "problem getting store descriptor for store %+v", s.Ident)
}
// Set countdown target for re-gossiping capacity earlier than
// the usual periodic interval.
rangeCountdown := float64(storeDesc.Capacity.RangeCount) * s.cfg.GossipWhenCapacityDeltaExceedsFraction
atomic.StoreInt32(&s.gossipRangeCountdown, int32(math.Ceil(math.Max(rangeCountdown, 1))))
leaseCountdown := float64(storeDesc.Capacity.LeaseCount) * s.cfg.GossipWhenCapacityDeltaExceedsFraction
atomic.StoreInt32(&s.gossipLeaseCountdown, int32(math.Ceil(math.Max(leaseCountdown, 1))))
syncutil.StoreFloat64(&s.gossipWritesPerSecondVal, storeDesc.Capacity.WritesPerSecond)
// Unique gossip key per store.
gossipStoreKey := gossip.MakeStoreKey(storeDesc.StoreID)
// Gossip store descriptor.
if err := s.cfg.Gossip.AddInfoProto(gossipStoreKey, storeDesc, ttlStoreGossip); err != nil {
return err
}
// Once we have gossiped the store descriptor the first time, other nodes
// will know that this node has restarted and will start sending Raft
// heartbeats for active ranges. We compute the time in the future where a
// replica on this store which receives a command for an idle range can
// campaign the associated Raft group immediately instead of waiting for the
// normal Raft election timeout.
//
// Note that computing this timestamp here is conservative. We really care
// that the node descriptor has been gossiped as that is how remote nodes
// locate this one to send Raft messages. The initialization sequence is:
// 1. gossip node descriptor
// 2. wait for gossip to be connected
// 3. gossip store descriptors (where we're at here)
s.idleReplicaElectionTime.Lock()
if s.idleReplicaElectionTime.at == (time.Time{}) {
// Raft uses a randomized election timeout in the range
// [electionTimeout,2*electionTimeout]. Using the lower bound here means
// that elections are somewhat more likely to be contested (assuming
// traffic is distributed evenly across a cluster that is restarted
// simultaneously). That's OK; it just adds a network round trip or two to
// the process since a contested election just restarts the clock to where
// it would have been anyway if we weren't doing idle replica campaigning.
electionTimeout := s.cfg.RaftTickInterval * time.Duration(s.cfg.RaftElectionTimeoutTicks)
s.idleReplicaElectionTime.at = s.Clock().PhysicalTime().Add(electionTimeout)
}
s.idleReplicaElectionTime.Unlock()
return nil
}
type capacityChangeEvent int
const (
rangeChangeEvent capacityChangeEvent = iota
leaseChangeEvent
)
// maybeGossipOnCapacityChange decrements the countdown on range
// and leaseholder counts. If it reaches 0, then we trigger an
// immediate gossip of this store's descriptor, to include updated
// capacity information.
func (s *Store) maybeGossipOnCapacityChange(ctx context.Context, cce capacityChangeEvent) {
if s.cfg.TestingKnobs.DisableLeaseCapacityGossip && cce == leaseChangeEvent {
return
}
if (cce == rangeChangeEvent && atomic.AddInt32(&s.gossipRangeCountdown, -1) == 0) ||
(cce == leaseChangeEvent && atomic.AddInt32(&s.gossipLeaseCountdown, -1) == 0) {
// Reset countdowns to avoid unnecessary gossiping.
atomic.StoreInt32(&s.gossipRangeCountdown, 0)
atomic.StoreInt32(&s.gossipLeaseCountdown, 0)
// Send using an async task because GossipStore needs the store mutex.
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on capacity change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on capacity change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on capacity change: %s", err)
}
}
}
// recordNewWritesPerSecond takes a recently calculated value for the number
// of key writes the store is handling and decides whether it has changed enough
// to justify re-gossiping the store's capacity.
func (s *Store) recordNewWritesPerSecond(newVal float64) {
oldVal := syncutil.LoadFloat64(&s.gossipWritesPerSecondVal)
if oldVal == -1 {
// Gossiping of store capacity is already ongoing.
return
}
if newVal < oldVal*.5 || newVal > oldVal*1.5 {
ctx := s.AnnotateCtx(context.TODO())
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on writes-per-second change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on writes-per-second change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on writes-per-second change: %s", err)
}
}
}
func (s *Store) canCampaignIdleReplica() bool {
s.idleReplicaElectionTime.Lock()
defer s.idleReplicaElectionTime.Unlock()
if s.idleReplicaElectionTime.at == (time.Time{}) {
return false
}
return !s.Clock().PhysicalTime().Before(s.idleReplicaElectionTime.at)
}
// GossipDeadReplicas broadcasts the store's dead replicas on the gossip
// network.
func (s *Store) GossipDeadReplicas(ctx context.Context) error {
deadReplicas := s.deadReplicas()
// Don't gossip if there's nothing to gossip.
if len(deadReplicas.Replicas) == 0 {
return nil
}
// Unique gossip key per store.
key := gossip.MakeDeadReplicasKey(s.StoreID())
// Gossip dead replicas.
return s.cfg.Gossip.AddInfoProto(key, &deadReplicas, ttlStoreGossip)
}
// Bootstrap writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
// should be completely empty. It returns an error if called on a
// non-empty engine.
func (s *Store) Bootstrap(
ctx context.Context, ident roachpb.StoreIdent, cv cluster.ClusterVersion,
) error {
if (s.Ident != roachpb.StoreIdent{}) {
return errors.Errorf("store %s is already bootstrapped", s)
}
ctx = s.AnnotateCtx(ctx)
if err := checkEngineEmpty(ctx, s.engine); err != nil {
return errors.Wrap(err, "cannot verify empty engine for bootstrap")
}
s.Ident = ident
batch := s.engine.NewBatch()
if err := engine.MVCCPutProto(
ctx,
batch,
nil,
keys.StoreIdentKey(),
hlc.Timestamp{},
nil,
&s.Ident,
); err != nil {
batch.Close()
return err
}
if err := WriteClusterVersion(ctx, batch, cv); err != nil {
batch.Close()
return errors.Wrap(err, "cannot write cluster version")
}
if err := batch.Commit(true); err != nil {
return errors.Wrap(err, "persisting bootstrap data")
}
s.NotifyBootstrapped()
return nil
}
// WriteLastUpTimestamp records the supplied timestamp into the "last up" key
// on this store. This value should be refreshed whenever this store's node
// updates its own liveness record; it is used by a restarting store to
// determine the approximate time that it stopped.
func (s *Store) WriteLastUpTimestamp(ctx context.Context, time hlc.Timestamp) error {
ctx = s.AnnotateCtx(ctx)
return engine.MVCCPutProto(
ctx,
s.engine,
nil,
keys.StoreLastUpKey(),
hlc.Timestamp{},
nil,
&time,
)
}
// ReadLastUpTimestamp returns the "last up" timestamp recorded in this store.
// This value can be used to approximate the last time the engine was was being
// served as a store by a running node. If the store does not contain a "last
// up" timestamp (for example, on a newly bootstrapped store), the zero
// timestamp is returned instead.
func (s *Store) ReadLastUpTimestamp(ctx context.Context) (hlc.Timestamp, error) {
var timestamp hlc.Timestamp
ok, err := engine.MVCCGetProto(
ctx, s.Engine(), keys.StoreLastUpKey(), hlc.Timestamp{}, true, nil, &timestamp)
if err != nil {
return hlc.Timestamp{}, err
} else if !ok {
return hlc.Timestamp{}, nil
}
return timestamp, nil
}
func checkEngineEmpty(ctx context.Context, eng engine.Engine) error {
kvs, err := engine.Scan(
eng,
engine.MakeMVCCMetadataKey(roachpb.Key(roachpb.RKeyMin)),
engine.MakeMVCCMetadataKey(roachpb.Key(roachpb.RKeyMax)),
10,
)
if err != nil {
return err
}
if len(kvs) > 0 {
// See if this is an already-bootstrapped store.
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return errors.Wrap(err, "unable to read store ident")
}
keyVals := make([]string, len(kvs))
for i, kv := range kvs {
keyVals[i] = fmt.Sprintf("%s: %q", kv.Key, kv.Value)
}
return errors.Errorf("engine belongs to store %s, contains %s", ident, keyVals)
}
return nil
}
// NotifyBootstrapped tells the store that it was bootstrapped and allows idle
// replicas to campaign immediately. This primarily affects tests.
func (s *Store) NotifyBootstrapped() {
s.idleReplicaElectionTime.Lock()
s.idleReplicaElectionTime.at = s.Clock().PhysicalTime()
s.idleReplicaElectionTime.Unlock()
}
// GetReplica fetches a replica by Range ID. Returns an error if no replica is found.
func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value), nil
}
return nil, roachpb.NewRangeNotFoundError(rangeID)
}
// LookupReplica looks up a replica via binary search over the
// "replicasByKey" btree. Returns nil if no replica is found for
// specified key range. Note that the specified keys are transformed
// using Key.Address() to ensure we lookup replicas correctly for local
// keys. When end is nil, a replica that contains start is looked up.
func (s *Store) LookupReplica(start, end roachpb.RKey) *Replica {
s.mu.RLock()
defer s.mu.RUnlock()
var repl *Replica
s.visitReplicasLocked(start, roachpb.RKeyMax, func(replIter *Replica) bool {
repl = replIter
return false
})
if repl == nil || !repl.Desc().ContainsKeyRange(start, end) {
return nil
}
return repl
}
// getOverlappingKeyRangeLocked returns a KeyRange from the Store overlapping the given
// descriptor (or nil if no such KeyRange exists).
func (s *Store) getOverlappingKeyRangeLocked(rngDesc *roachpb.RangeDescriptor) KeyRange {
var kr KeyRange
s.mu.replicasByKey.AscendGreaterOrEqual(rangeBTreeKey(rngDesc.StartKey.Next()),
func(item btree.Item) bool {
kr = item.(KeyRange)
return false
})
if kr != nil && kr.Desc().StartKey.Less(rngDesc.EndKey) {
return kr
}
return nil
}
// visitReplicasLocked will call iterator for every replica on the store which
// contains any keys in the span between startKey and endKey. Iteration will be
// in ascending order. Iteration can be stopped early by returning false from
// iterator.
func (s *Store) visitReplicasLocked(startKey, endKey roachpb.RKey, iterator func(r *Replica) bool) {
// Iterate over replicasByKey to visit all ranges containing keys in the
// specified range. We use startKey.Next() because btree's Ascend methods
// are inclusive of the start bound and exclusive of the end bound, but
// ranges are stored in the BTree by EndKey; in cockroach, end keys have the
// opposite behavior (a range's EndKey is contained by the subsequent
// range). We want visitReplicasLocked to match cockroach's behavior; using
// startKey.Next(), will ignore a range which has EndKey exactly equal to
// the supplied startKey. Iteration ends when all ranges are exhausted, or
// the next range contains no keys in the supplied span.
s.mu.replicasByKey.AscendGreaterOrEqual(rangeBTreeKey(startKey.Next()),
func(item btree.Item) bool {
kr := item.(KeyRange)
if !kr.Desc().StartKey.Less(endKey) {
// This properly checks if this range contains any keys in the supplied span.
return false
}
switch rep := item.(type) {
case *Replica:
return iterator(rep)
default:
return true
}
})
}
// RaftStatus returns the current raft status of the local replica of
// the given range.
func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value).RaftStatus()
}
return nil
}
// BootstrapRange creates the first range in the cluster and manually
// writes it to the store. Default range addressing records are
// created for meta1 and meta2. Default configurations for
// zones are created. All configs are specified
// for the empty key prefix, meaning they apply to the entire
// database. The zone requires three replicas with no other specifications.
// It also adds the range tree and the root node, the first range, to it.
// The 'initialValues' are written as well after each value's checksum
// is initialized.
func (s *Store) BootstrapRange(
initialValues []roachpb.KeyValue, bootstrapVersion roachpb.Version,
) error {
desc := &roachpb.RangeDescriptor{
RangeID: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
NextReplicaID: 2,
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
ReplicaID: 1,
},
},
}
if err := desc.Validate(); err != nil {
return err
}
batch := s.engine.NewBatch()
defer batch.Close()
ms := &enginepb.MVCCStats{}
now := s.cfg.Clock.Now()
ctx := context.Background()
// Bootstrap version information. We don't do this if this is v1.0, which is
// never going to be true in versions that have this code in production, but
// can be true in tests.
if bootstrapVersion != cluster.VersionBase {
if err := engine.MVCCPutProto(ctx, batch, ms /* ms */, keys.BootstrapVersionKey, hlc.Timestamp{}, nil, &bootstrapVersion); err != nil {
return err
}
}
// Range descriptor.
if err := engine.MVCCPutProto(ctx, batch, ms, keys.RangeDescriptorKey(desc.StartKey), now, nil, desc); err != nil {
return err
}
// Replica GC timestamp.
if err := engine.MVCCPutProto(ctx, batch, nil /* ms */, keys.RangeLastReplicaGCTimestampKey(desc.RangeID), hlc.Timestamp{}, nil, &now); err != nil {
return err
}
// Range addressing for meta2.
meta2Key := keys.RangeMetaKey(roachpb.RKeyMax)
if err := engine.MVCCPutProto(ctx, batch, ms, meta2Key, now, nil, desc); err != nil {
return err
}
// Range addressing for meta1.
meta2KeyAddr, err := keys.Addr(meta2Key)
if err != nil {
return err
}
meta1Key := keys.RangeMetaKey(meta2KeyAddr)
if err := engine.MVCCPutProto(ctx, batch, ms, meta1Key, now, nil, desc); err != nil {
return err
}
// Now add all passed-in default entries.
for _, kv := range initialValues {
// Initialize the checksums.
kv.Value.InitChecksum(kv.Key)
if err := engine.MVCCPut(ctx, batch, ms, kv.Key, now, kv.Value, nil); err != nil {
return err
}
}
// Set range stats.
if err := engine.AccountForSelf(ms, desc.RangeID); err != nil {
return err
}
updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{})
if err != nil {
return err
}
*ms = updatedMS
return batch.Commit(true /* sync */)
}
// ClusterID accessor.
func (s *Store) ClusterID() uuid.UUID { return s.Ident.ClusterID }
// StoreID accessor.
func (s *Store) StoreID() roachpb.StoreID { return s.Ident.StoreID }
// Clock accessor.
func (s *Store) Clock() *hlc.Clock { return s.cfg.Clock }
// Engine accessor.
func (s *Store) Engine() engine.Engine { return s.engine }
// DB accessor.
func (s *Store) DB() *client.DB { return s.cfg.DB }
// Gossip accessor.
func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip }
// Stopper accessor.
func (s *Store) Stopper() *stop.Stopper { return s.stopper }
// Tracer accessor.
func (s *Store) Tracer() opentracing.Tracer { return s.cfg.AmbientCtx.Tracer }
// TestingKnobs accessor.
func (s *Store) TestingKnobs() *StoreTestingKnobs { return &s.cfg.TestingKnobs }
// IsDraining accessor.
func (s *Store) IsDraining() bool {
return s.draining.Load().(bool)
}
// NewRangeDescriptor creates a new descriptor based on start and end
// keys and the supplied roachpb.Replicas slice. It allocates a new
// range ID and returns a RangeDescriptor whose Replicas are a copy
// of the supplied replicas slice, with appropriate ReplicaIDs assigned.
func (s *Store) NewRangeDescriptor(
ctx context.Context, start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor,
) (*roachpb.RangeDescriptor, error) {
id, err := s.rangeIDAlloc.Allocate(ctx)
if err != nil {
return nil, err
}
desc := &roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(id),
StartKey: start,
EndKey: end,
Replicas: append([]roachpb.ReplicaDescriptor(nil), replicas...),
NextReplicaID: roachpb.ReplicaID(len(replicas) + 1),
}
for i := range desc.Replicas {
desc.Replicas[i].ReplicaID = roachpb.ReplicaID(i + 1)
}
return desc, nil
}
// splitPreApply is called when the raft command is applied. Any
// changes to the given ReadWriter will be written atomically with the
// split commit.
func splitPreApply(
ctx context.Context, st *cluster.Settings, eng engine.ReadWriter, split roachpb.SplitTrigger,
) {
// Update the raft HardState with the new Commit value now that the
// replica is initialized (combining it with existing or default
// Term and Vote).
rsl := makeReplicaStateLoader(split.RightDesc.RangeID)
if err := rsl.synthesizeRaftState(ctx, eng); err != nil {
log.Fatal(ctx, err)
}
}
// splitPostApply is the part of the split trigger which coordinates the actual
// split with the Store. Requires that Replica.raftMu is held.
//
// TODO(tschottdorf): Want to merge this with SplitRange, but some legacy
// testing code calls SplitRange directly.
func splitPostApply(
ctx context.Context, deltaMS enginepb.MVCCStats, split *roachpb.SplitTrigger, r *Replica,
) {
// The right hand side of the split was already created (and its raftMu
// acquired) in Replica.acquireSplitLock. It must be present here.
rightRng, err := r.store.GetReplica(split.RightDesc.RangeID)
if err != nil {
log.Fatalf(ctx, "unable to find RHS replica: %s", err)
}
{
rightRng.mu.Lock()
// Already holding raftMu, see above.
err := rightRng.initRaftMuLockedReplicaMuLocked(&split.RightDesc, r.store.Clock(), 0)
rightRng.mu.Unlock()
if err != nil {
log.Fatal(ctx, err)
}
}
// Finish initialization of the RHS.
r.mu.Lock()
rightRng.mu.Lock()
// Copy the minLeaseProposedTS from the LHS.
rightRng.mu.minLeaseProposedTS = r.mu.minLeaseProposedTS
rightLease := *rightRng.mu.state.Lease
rightRng.mu.Unlock()
r.mu.Unlock()
log.Event(ctx, "copied timestamp cache")
// Invoke the leasePostApply method to ensure we properly initialize
// the replica according to whether it holds the lease. This enables
// the PushTxnQueue.
rightRng.leasePostApply(ctx, rightLease)
// Add the RHS replica to the store. This step atomically updates
// the EndKey of the LHS replica and also adds the RHS replica
// to the store's replica map.
if err := r.store.SplitRange(ctx, r, rightRng); err != nil {
// Our in-memory state has diverged from the on-disk state.
log.Fatalf(ctx, "%s: failed to update Store after split: %s", r, err)
}
// Update store stats with difference in stats before and after split.
r.store.metrics.addMVCCStats(deltaMS)
now := r.store.Clock().Now()
// While performing the split, zone config changes or a newly created table
// might require the range to be split again. Enqueue both the left and right
// ranges to speed up such splits. See #10160.
r.store.splitQueue.MaybeAdd(r, now)
r.store.splitQueue.MaybeAdd(rightRng, now)
// If the range was not properly replicated before the split, the replicate
// queue may not have picked it up (due to the need for a split). Enqueue
// both the left and right ranges to speed up a potentially necessary
// replication. See #7022 and #7800.
r.store.replicateQueue.MaybeAdd(r, now)
r.store.replicateQueue.MaybeAdd(rightRng, now)
if len(split.RightDesc.Replicas) == 1 {
// TODO(peter): In single-node clusters, we enqueue the right-hand side of
// the split (the new range) for Raft processing so that the corresponding
// Raft group is created. This shouldn't be necessary for correctness, but
// some tests rely on this (e.g. server.TestStatusSummaries).
r.store.enqueueRaftUpdateCheck(rightRng.RangeID)
}
}
// SplitRange shortens the original range to accommodate the new range. The new
// range is added to the ranges map and the replicasByKey btree. origRng.raftMu
// and newRng.raftMu must be held.
//
// This is only called from the split trigger in the context of the execution
// of a Raft command.
func (s *Store) SplitRange(ctx context.Context, origRng, newRng *Replica) error {
origDesc := origRng.Desc()
newDesc := newRng.Desc()
if !bytes.Equal(origDesc.EndKey, newDesc.EndKey) ||
bytes.Compare(origDesc.StartKey, newDesc.StartKey) >= 0 {
return errors.Errorf("orig range is not splittable by new range: %+v, %+v", origDesc, newDesc)
}
s.mu.Lock()
defer s.mu.Unlock()
if exRng, ok := s.mu.uninitReplicas[newDesc.RangeID]; ok {
// If we have an uninitialized replica of the new range we require pointer
// equivalence with newRng. See Store.splitTriggerPostApply().
if exRng != newRng {
log.Fatalf(ctx, "found unexpected uninitialized replica: %s vs %s", exRng, newRng)
}
delete(s.mu.uninitReplicas, newDesc.RangeID)
s.mu.replicas.Delete(int64(newDesc.RangeID))
s.replicaQueues.Delete(int64(newDesc.RangeID))
}
// Replace the end key of the original range with the start key of
// the new range. Reinsert the range since the btree is keyed by range end keys.
if kr := s.mu.replicasByKey.Delete(origRng); kr != origRng {
return errors.Errorf("replicasByKey unexpectedly contains %v instead of replica %s", kr, origRng)
}
copyDesc := *origDesc
copyDesc.EndKey = append([]byte(nil), newDesc.StartKey...)
origRng.setDescWithoutProcessUpdate(&copyDesc)
// Clear the LHS push txn queue, to redirect to the RHS if
// appropriate. We do this after setDescWithoutProcessUpdate
// to ensure that no pre-split commands are inserted into the
// pushTxnQueue after we clear it.
origRng.pushTxnQueue.Clear(false /* disable */)
// Clear the original range's request stats, since they include requests for
// spans that are now owned by the new range.
origRng.leaseholderStats.resetRequestCounts()
origRng.writeStats.splitRequestCounts(newRng.writeStats)
if kr := s.mu.replicasByKey.ReplaceOrInsert(origRng); kr != nil {
return errors.Errorf("replicasByKey unexpectedly contains %s when inserting replica %s", kr, origRng)
}
if err := s.addReplicaInternalLocked(newRng); err != nil {
return errors.Errorf("couldn't insert range %v in replicasByKey btree: %s", newRng, err)
}
// Update the max bytes and other information of the new range.
// This may not happen if the system config has not yet been loaded.
// Since this is done under the store lock, system config update will
// properly set these fields.
if err := newRng.updateRangeInfo(newRng.Desc()); err != nil {
return err
}
// Add the range to metrics and maybe gossip on capacity change.
s.metrics.ReplicaCount.Inc(1)
s.maybeGossipOnCapacityChange(ctx, rangeChangeEvent)
return s.processRangeDescriptorUpdateLocked(ctx, origRng)
}
// MergeRange expands the subsuming range to absorb the subsumed range. This
// merge operation will fail if the two ranges are not collocated on the same
// store.
// The subsumed range's raftMu is assumed held.
func (s *Store) MergeRange(
ctx context.Context,
subsumingRng *Replica,
updatedEndKey roachpb.RKey,
subsumedRangeID roachpb.RangeID,
) error {
subsumingDesc := subsumingRng.Desc()
if !subsumingDesc.EndKey.Less(updatedEndKey) {
return errors.Errorf("the new end key is not greater than the current one: %+v <= %+v",
updatedEndKey, subsumingDesc.EndKey)
}
subsumedRng, err := s.GetReplica(subsumedRangeID)
if err != nil {
return errors.Errorf("could not find the subsumed range: %d", subsumedRangeID)
}
subsumedDesc := subsumedRng.Desc()
if !replicaSetsEqual(subsumedDesc.Replicas, subsumingDesc.Replicas) {
return errors.Errorf("ranges are not on the same replicas sets: %+v != %+v",
subsumedDesc.Replicas, subsumingDesc.Replicas)
}
if subsumingRng.leaseholderStats != nil {
subsumingRng.leaseholderStats.resetRequestCounts()
}
if subsumingRng.writeStats != nil {
// Note: this could be drastically improved by adding a replicaStats method
// that merges stats. Resetting stats is typically bad for the rebalancing
// logic that depends on them.
subsumingRng.writeStats.resetRequestCounts()
}
if err := s.maybeMergeTimestampCaches(ctx, subsumingRng, subsumedRng); err != nil {
return err
}
// Remove and destroy the subsumed range. Note that we were called
// (indirectly) from raft processing so we must call removeReplicaImpl
// directly to avoid deadlocking on Replica.raftMu.
if err := s.removeReplicaImpl(ctx, subsumedRng, *subsumedDesc, false); err != nil {
return errors.Errorf("cannot remove range %s", err)
}
// Clear the RHS push txn queue, to redirect to the LHS if
// appropriate.
subsumedRng.pushTxnQueue.Clear(false /* disable */)
// Update the end key of the subsuming range.
copy := *subsumingDesc
copy.EndKey = updatedEndKey
return subsumingRng.setDesc(&copy)
}
// If the subsuming replica has the range lease, we update its timestamp cache
// with the entries from the subsumed. Otherwise, then the timestamp cache
// doesn't matter (in fact it should be empty, to save memory).
func (s *Store) maybeMergeTimestampCaches(
ctx context.Context, subsumingRep *Replica, subsumedRep *Replica,
) error {
subsumingRep.mu.Lock()
defer subsumingRep.mu.Unlock()
subsumingLease := subsumingRep.mu.state.Lease
subsumedRep.mu.Lock()
defer subsumedRep.mu.Unlock()
subsumedLease := *subsumedRep.mu.state.Lease
// Merge support is currently incomplete and incorrect. In particular, the
// lease holders must be colocated and the subsumed range appropriately
// quiesced. See also #2433.
now := s.Clock().Now()
if subsumedRep.isLeaseValidRLocked(subsumedLease, now) &&
subsumingLease.Replica.StoreID != subsumedLease.Replica.StoreID {
log.Fatalf(ctx, "cannot merge ranges with non-colocated leases. "+
"Subsuming lease: %s. Subsumed lease: %s.", subsumingLease, subsumedLease)
}
return nil
}
// addReplicaInternalLocked adds the replica to the replicas map and the
// replicasByKey btree. Returns an error if a replica with
// the same Range ID or a KeyRange that overlaps has already been added to
// this store. addReplicaInternalLocked requires that the store lock is held.
func (s *Store) addReplicaInternalLocked(repl *Replica) error {
if !repl.IsInitialized() {
return errors.Errorf("attempted to add uninitialized range %s", repl)
}
// TODO(spencer): will need to determine which range is
// newer, and keep that one.
if err := s.addReplicaToRangeMapLocked(repl); err != nil {
return err
}
if exRange := s.getOverlappingKeyRangeLocked(repl.Desc()); exRange != nil {
return errors.Errorf("%s: cannot addReplicaInternalLocked; range %s has overlapping range %s", s, repl, exRange.Desc())
}
if exRngItem := s.mu.replicasByKey.ReplaceOrInsert(repl); exRngItem != nil {
return errors.Errorf("%s: cannot addReplicaInternalLocked; range for key %v already exists in replicasByKey btree", s,
exRngItem.(KeyRange).endKey())
}
return nil
}
// addPlaceholderLocked adds the specified placeholder. Requires that Store.mu
// and Replica.raftMu are held.
func (s *Store) addPlaceholderLocked(placeholder *ReplicaPlaceholder) error {
rangeID := placeholder.Desc().RangeID
if exRng := s.mu.replicasByKey.ReplaceOrInsert(placeholder); exRng != nil {
return errors.Errorf("%s overlaps with existing KeyRange %s in replicasByKey btree", placeholder, exRng)
}
if exRng, ok := s.mu.replicaPlaceholders[rangeID]; ok {
return errors.Errorf("%s has ID collision with existing KeyRange %s", placeholder, exRng)
}
s.mu.replicaPlaceholders[rangeID] = placeholder
return nil
}
// removePlaceholder removes a placeholder for the specified range if it
// exists, returning true if a placeholder was present and removed and false
// otherwise. Requires that Replica.raftMu is held.
func (s *Store) removePlaceholder(ctx context.Context, rngID roachpb.RangeID) bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.removePlaceholderLocked(ctx, rngID)
}
// removePlaceholderLocked removes the specified placeholder. Requires that
// Store.mu and Replica.raftMu are held.
func (s *Store) removePlaceholderLocked(ctx context.Context, rngID roachpb.RangeID) bool {
placeholder, ok := s.mu.replicaPlaceholders[rngID]
if !ok {
return false
}
switch exRng := s.mu.replicasByKey.Delete(placeholder).(type) {
case *ReplicaPlaceholder:
delete(s.mu.replicaPlaceholders, rngID)
return true
case nil:
log.Fatalf(ctx, "r%d: placeholder not found", rngID)
default:
log.Fatalf(ctx, "r%d: expected placeholder, got %T", rngID, exRng)
}
return false // appease the compiler
}
// addReplicaToRangeMapLocked adds the replica to the replicas map.
// addReplicaToRangeMapLocked requires that the store lock is held.
func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error {
if _, loaded := s.mu.replicas.LoadOrStore(int64(repl.RangeID), unsafe.Pointer(repl)); loaded {
return errors.Errorf("%s: replica already exists", repl)
}
return nil
}
// RemoveReplica removes the replica from the store's replica map and
// from the sorted replicasByKey btree. The version of the replica
// descriptor that was used to make the removal decision is passed in,
// and the removal is aborted if the replica ID has changed since
// then. If `destroy` is true, all data belonging to the replica will be
// deleted. In either case a tombstone record will be written.
func (s *Store) RemoveReplica(
ctx context.Context, rep *Replica, consistentDesc roachpb.RangeDescriptor, destroy bool,
) error {
if destroy {
// Destroying replica state is moderately expensive, so we serialize such
// operations with applying non-empty snapshots.
select {
case s.snapshotApplySem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
case <-s.stopper.ShouldStop():
return errors.Errorf("stopped")
}
defer func() {
<-s.snapshotApplySem
}()
}
rep.raftMu.Lock()
defer rep.raftMu.Unlock()
return s.removeReplicaImpl(ctx, rep, consistentDesc, destroy)
}
// removeReplicaImpl is the implementation of RemoveReplica, which is sometimes
// called directly when the necessary lock is already held. It requires that
// Replica.raftMu is held and that s.mu is not held.
func (s *Store) removeReplicaImpl(
ctx context.Context, rep *Replica, consistentDesc roachpb.RangeDescriptor, destroyData bool,
) error {
log.Infof(ctx, "removing replica")
// We check both rep.mu.ReplicaID and rep.mu.state.Desc's replica ID because
// they can differ in cases when a replica's ID is increased due to an
// incoming raft message (see #14231 for background).
rep.mu.Lock()
if rep.mu.replicaID >= consistentDesc.NextReplicaID {
rep.mu.Unlock()
return errors.Errorf("cannot remove replica %s; replica ID has changed (%s >= %s)",
rep, rep.mu.replicaID, consistentDesc.NextReplicaID)
}
desc := rep.mu.state.Desc
if repDesc, ok := desc.GetReplicaDescriptor(s.StoreID()); ok && repDesc.ReplicaID >= consistentDesc.NextReplicaID {
rep.mu.Unlock()
return errors.Errorf("cannot remove replica %s; replica descriptor's ID has changed (%s >= %s)",
rep, repDesc.ReplicaID, consistentDesc.NextReplicaID)
}
rep.mu.Unlock()
// TODO(peter): Could use s.mu.RLock here?
s.mu.Lock()
if _, err := s.GetReplica(rep.RangeID); err != nil {
s.mu.Unlock()
return err
}
if placeholder := s.getOverlappingKeyRangeLocked(desc); placeholder != rep {
// This is a fatal error because uninitialized replicas shouldn't make it
// this far. This method will need some changes when we introduce GC of
// uninitialized replicas.
s.mu.Unlock()
log.Fatalf(ctx, "replica %+v unexpectedly overlapped by %+v", rep, placeholder)
}
// Adjust stats before calling Destroy. This can be called before or after
// Destroy, but this configuration helps avoid races in stat verification
// tests.
s.metrics.subtractMVCCStats(rep.GetMVCCStats())
s.metrics.ReplicaCount.Dec(1)
s.mu.Unlock()
// Mark the replica as destroyed and (optionally) destroy the on-disk data
// while not holding Store.mu. This is safe because we're holding
// Replica.raftMu and the replica is present in Store.mu.replicasByKey
// (preventing any concurrent access to the replica's key range).
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()
rep.cancelPendingCommandsLocked()
rep.mu.internalRaftGroup = nil
rep.mu.destroyed = roachpb.NewRangeNotFoundError(rep.RangeID)
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
if destroyData {
if err := rep.destroyDataRaftMuLocked(ctx, consistentDesc); err != nil {
return err
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.mu.replicas.Delete(int64(rep.RangeID))
delete(s.mu.uninitReplicas, rep.RangeID)
s.replicaQueues.Delete(int64(rep.RangeID))
if placeholder := s.mu.replicasByKey.Delete(rep); placeholder != rep {
// We already checked that our replica was present in replicasByKey
// above. Nothing should have been able to change that.
log.Fatalf(ctx, "replica %+v unexpectedly overlapped by %+v", rep, placeholder)
}
delete(s.mu.replicaPlaceholders, rep.RangeID)
// TODO(peter): Could release s.mu.Lock() here.
s.maybeGossipOnCapacityChange(ctx, rangeChangeEvent)
s.scanner.RemoveReplica(rep)
return nil
}
// processRangeDescriptorUpdate should be called whenever a replica's range
// descriptor is updated, to update the store's maps of its ranges to match
// the updated descriptor. Since the latter update requires acquiring the store
// lock (which cannot always safely be done by replicas), this function call
// should be deferred until it is safe to acquire the store lock.
func (s *Store) processRangeDescriptorUpdate(ctx context.Context, repl *Replica) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.processRangeDescriptorUpdateLocked(ctx, repl)
}
// processRangeDescriptorUpdateLocked requires that Store.mu and Replica.raftMu
// are locked.
func (s *Store) processRangeDescriptorUpdateLocked(ctx context.Context, repl *Replica) error {
if !repl.IsInitialized() {
return errors.Errorf("attempted to process uninitialized range %s", repl)
}
rangeID := repl.RangeID
if _, ok := s.mu.uninitReplicas[rangeID]; !ok {
// Do nothing if the range has already been initialized.
return nil
}
delete(s.mu.uninitReplicas, rangeID)
if exRange := s.getOverlappingKeyRangeLocked(repl.Desc()); exRange != nil {
return errors.Errorf("%s: cannot processRangeDescriptorUpdate; range %s has overlapping range %s", s, repl, exRange.Desc())
}
if exRngItem := s.mu.replicasByKey.ReplaceOrInsert(repl); exRngItem != nil {
return errors.Errorf("range for key %v already exists in replicasByKey btree",
(exRngItem.(*Replica)).endKey())
}
// Add the range to metrics and maybe gossip on capacity change.
s.metrics.ReplicaCount.Inc(1)
s.maybeGossipOnCapacityChange(ctx, rangeChangeEvent)
return nil
}
// Attrs returns the attributes of the underlying store.
func (s *Store) Attrs() roachpb.Attributes {
return s.engine.Attrs()
}
// Capacity returns the capacity of the underlying storage engine. Note that
// this does not include reservations.
// Note that Capacity() has the side effect of updating some of the store's
// internal statistics about its replicas.
func (s *Store) Capacity() (roachpb.StoreCapacity, error) {
capacity, err := s.engine.Capacity()
if err != nil {
return capacity, err
}
capacity.RangeCount = int32(s.ReplicaCount())
now := s.cfg.Clock.Now()
var leaseCount int32
var logicalBytes int64
var totalWritesPerSecond float64
bytesPerReplica := make([]float64, 0, capacity.RangeCount)
writesPerReplica := make([]float64, 0, capacity.RangeCount)
newStoreReplicaVisitor(s).Visit(func(r *Replica) bool {
if r.OwnsValidLease(now) {
leaseCount++
}
mvccStats := r.GetMVCCStats()
logicalBytes += mvccStats.Total()
bytesPerReplica = append(bytesPerReplica, float64(mvccStats.Total()))
// TODO(a-robinson): How dangerous is it that this number will be incorrectly
// low the first time or two it gets gossiped when a store starts? We can't
// easily have a countdown as its value changes like for leases/replicas.
if qps, dur := r.writeStats.avgQPS(); dur >= MinStatsDuration {
totalWritesPerSecond += qps
writesPerReplica = append(writesPerReplica, qps)
}
return true
})
capacity.LeaseCount = leaseCount
capacity.LogicalBytes = logicalBytes
capacity.WritesPerSecond = totalWritesPerSecond
capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica)
capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica)
s.recordNewWritesPerSecond(totalWritesPerSecond)
return capacity, nil
}
// ReplicaCount returns the number of replicas contained by this store. This
// method is O(n) in the number of replicas and should not be called from
// performance critical code.
func (s *Store) ReplicaCount() int {
var count int
s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool {
count++
return true
})
return count
}
// Registry returns the store registry.
func (s *Store) Registry() *metric.Registry {
return s.metrics.registry
}
// Metrics returns the store's metric struct.
func (s *Store) Metrics() *StoreMetrics {
return s.metrics
}
// MVCCStats returns the current MVCCStats accumulated for this store.
// TODO(mrtracy): This should be removed as part of #4465, this is only needed
// to support the current StatusSummary structures which will be changing.
func (s *Store) MVCCStats() enginepb.MVCCStats {
s.metrics.mu.Lock()
defer s.metrics.mu.Unlock()
return s.metrics.mu.stats
}
// Descriptor returns a StoreDescriptor including current store
// capacity information.
func (s *Store) Descriptor() (*roachpb.StoreDescriptor, error) {
capacity, err := s.Capacity()
if err != nil {
return nil, err
}
// Initialize the store descriptor.
return &roachpb.StoreDescriptor{
StoreID: s.Ident.StoreID,
Attrs: s.Attrs(),
Node: *s.nodeDesc,
Capacity: capacity,
}, nil
}
// deadReplicas returns a list of all the corrupt replicas on the store.
func (s *Store) deadReplicas() roachpb.StoreDeadReplicas {
// We can't use a storeReplicaVisitor here as it skips destroyed replicas.
//
// TODO(bram): does this need to visit all the replicas? Could we just use the
// store pool to locate any dead replicas on this store directly?
var deadReplicas []roachpb.ReplicaIdent
s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
r := (*Replica)(v)
r.mu.RLock()
corrupted := r.mu.corrupted
desc := r.mu.state.Desc
r.mu.RUnlock()
replicaDesc, ok := desc.GetReplicaDescriptor(s.Ident.StoreID)
if ok && corrupted {
deadReplicas = append(deadReplicas, roachpb.ReplicaIdent{
RangeID: desc.RangeID,
Replica: replicaDesc,
})
}
return true
})
return roachpb.StoreDeadReplicas{
StoreID: s.Ident.StoreID,
Replicas: deadReplicas,
}
}
// Send fetches a range based on the header's replica, assembles method, args &
// reply into a Raft Cmd struct and executes the command using the fetched
// range.
// An incoming request may be transactional or not. If it is not transactional,
// the timestamp at which it executes may be higher than that optionally
// specified through the incoming BatchRequest, and it is not guaranteed that
// all operations are written at the same timestamp. If it is transactional, a
// timestamp must not be set - it is deduced automatically from the
// transaction. In particular, the read (original) timestamp will be used for
// all reads _and writes_ (see the TxnMeta.OrigTimestamp for details).
//
// Should a transactional operation be forced to a higher timestamp (for
// instance due to the timestamp cache or finding a committed value in the path
// of one of its writes), the response will have a transaction set which should
// be used to update the client transaction.
func (s *Store) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Attach any log tags from the store to the context (which normally
// comes from gRPC).
ctx = s.AnnotateCtx(ctx)
for _, union := range ba.Requests {
arg := union.GetInner()
if _, ok := arg.(*roachpb.NoopRequest); ok {
continue
}
header := arg.Header()
if err := verifyKeys(header.Key, header.EndKey, roachpb.IsRange(arg)); err != nil {
return nil, roachpb.NewError(err)
}
}
if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
return nil, roachpb.NewError(err)
}
if s.cfg.TestingKnobs.ClockBeforeSend != nil {
s.cfg.TestingKnobs.ClockBeforeSend(s.cfg.Clock, ba)
}
if maxOffset := s.Clock().MaxOffset(); maxOffset > 0 && maxOffset != timeutil.ClocklessMaxOffset {
// Once a command is submitted to raft, all replicas' logical
// clocks will be ratcheted forward to match. If the command
// appears to come from a node with a bad clock, reject it now
// before we reach that point.
offset := time.Duration(ba.Timestamp.WallTime - s.Clock().PhysicalNow())
if offset > maxOffset && !s.cfg.TestingKnobs.DisableMaxOffsetCheck {
return nil, roachpb.NewErrorf("rejecting command with timestamp in the future: %d (%s ahead)",
ba.Timestamp.WallTime, offset)
}
}
// Update our clock with the incoming request timestamp. This advances the
// local node's clock to a high water mark from all nodes with which it has
// interacted. We hold on to the resulting timestamp - we know that any
// write with a higher timestamp we run into later must have started after
// this point in (absolute) time.
now := s.cfg.Clock.Update(ba.Timestamp)
defer func() {
if r := recover(); r != nil {
// On panic, don't run the defer. It's probably just going to panic
// again due to undefined state.
panic(r)
}
if ba.Txn != nil {
// We're in a Txn, so we can reduce uncertainty restarts by attaching
// the above timestamp to the returned response or error. The caller
// can use it to shorten its uncertainty interval when it comes back to
// this node.
if pErr != nil {
pErr.OriginNode = ba.Replica.NodeID
if txn := pErr.GetTxn(); txn != nil {
// Clone the txn, as we'll modify it.
pErr.SetTxn(txn)
} else {
pErr.SetTxn(ba.Txn)
}
pErr.GetTxn().UpdateObservedTimestamp(ba.Replica.NodeID, now)
} else {
if br.Txn == nil {
br.Txn = ba.Txn
}
br.Txn.UpdateObservedTimestamp(ba.Replica.NodeID, now)
// Update our clock with the outgoing response txn timestamp.
s.cfg.Clock.Update(br.Txn.Timestamp)
}
} else {
if pErr == nil {
// Update our clock with the outgoing response timestamp.
s.cfg.Clock.Update(br.Timestamp)
}
}
if pErr != nil {
pErr.Now = now
} else {
br.Now = now
}
}()
if ba.Txn != nil {
// We make our transaction aware that no other operation that causally
// precedes it could have started after `now`. This is important: If we
// wind up pushing a value, it will be in our immediate future, and not
// updating the top end of our uncertainty timestamp would lead to a
// restart (at least in the absence of a prior observed timestamp from
// this node, in which case the following is a no-op).
if now.Less(ba.Txn.MaxTimestamp) {
shallowTxn := *ba.Txn
shallowTxn.MaxTimestamp.Backward(now)
ba.Txn = &shallowTxn
}
}
if log.V(1) {
log.Eventf(ctx, "executing %s", ba)
} else if log.HasSpanOrEvent(ctx) {
log.Eventf(ctx, "executing %d requests", len(ba.Requests))
}
// Add the command to the range for execution; exit retry loop on success.
for {
// Exit loop if context has been canceled or timed out.
if err := ctx.Err(); err != nil {
return nil, roachpb.NewError(err)
}
// Get range and add command to the range for execution.
repl, err := s.GetReplica(ba.RangeID)
if err != nil {
return nil, roachpb.NewError(err)
}
if !repl.IsInitialized() {
repl.mu.RLock()
replicaID := repl.mu.replicaID
repl.mu.RUnlock()
// If we have an uninitialized copy of the range, then we are
// probably a valid member of the range, we're just in the
// process of getting our snapshot. If we returned
// RangeNotFoundError, the client would invalidate its cache,
// but we can be smarter: the replica that caused our
// uninitialized replica to be created is most likely the
// leader.
return nil, roachpb.NewError(&roachpb.NotLeaseHolderError{
RangeID: ba.RangeID,
LeaseHolder: repl.creatingReplica,
// The replica doesn't have a range descriptor yet, so we have to build
// a ReplicaDescriptor manually.
Replica: roachpb.ReplicaDescriptor{
NodeID: repl.store.nodeDesc.NodeID,
StoreID: repl.store.StoreID(),
ReplicaID: replicaID,
},
})
}
// If necessary, the request may need to wait in the push txn queue,
// pending updates to the target transaction for either PushTxn or
// QueryTxn requests.
if br, pErr = s.maybeWaitInPushTxnQueue(ctx, &ba, repl); br != nil || pErr != nil {
return br, pErr
}
br, pErr = repl.Send(ctx, ba)
if pErr == nil {
return br, nil
}
// Handle push txn failures and write intent conflicts locally and
// retry. Other errors are returned to caller.
switch pErr.GetDetail().(type) {
case *roachpb.TransactionPushError:
// On a transaction push error, retry immediately if doing so will
// enqueue into the pushTxnQueue in order to await further updates to the
// unpushed txn's status.
dontRetry := s.cfg.DontRetryPushTxnFailures
if !dontRetry && ba.IsSinglePushTxnRequest() {
pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest)
dontRetry = shouldPushImmediately(pushReq)
}
if dontRetry {
// If we're not retrying on push txn failures return a txn retry error
// after the first failure to guarantee a retry.
if ba.Txn != nil {
err := roachpb.NewTransactionRetryError(roachpb.RETRY_REASON_UNKNOWN)
return nil, roachpb.NewErrorWithTxn(err, ba.Txn)
}
return nil, pErr
}
pErr = nil // retry command
case *roachpb.WriteIntentError:
// Process and resolve write intent error. We do this here because
// this is the code path with the requesting client waiting.
if pErr.Index != nil {
var pushType roachpb.PushTxnType
if ba.IsWrite() {
pushType = roachpb.PUSH_ABORT
} else {
pushType = roachpb.PUSH_TIMESTAMP
}
index := pErr.Index
args := ba.Requests[index.Index].GetInner()
// Make a copy of the header for the upcoming push; we will update
// the timestamp.
h := ba.Header
// We must push at least to h.Timestamp, but in fact we want to
// go all the way up to a timestamp which was taken off the HLC
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
h.Timestamp.Forward(now)
// We are going to hand the header (and thus the transaction proto)
// to the RPC framework, after which it must not be changed (since
// that could race). Since the subsequent execution of the original
// request might mutate the transaction, make a copy here.
//
// See #9130.
if h.Txn != nil {
clonedTxn := h.Txn.Clone()
h.Txn = &clonedTxn
}
if pErr = s.intentResolver.processWriteIntentError(ctx, pErr, args, h, pushType); pErr != nil {
// Do not propagate ambiguous results; assume success and retry original op.
if _, ok := pErr.GetDetail().(*roachpb.AmbiguousResultError); !ok {
// Preserve the error index.
pErr.Index = index
return nil, pErr
}
pErr = nil
}
// We've resolved the write intent; retry command.
}
// Increase the sequence counter to avoid getting caught in replay
// protection on retry.
ba.SetNewRequest()
}
if pErr != nil {
return nil, pErr
}
}
}
// maybeWaitInPushTxnQueue potentially diverts the incoming request to
// the push txn queue, where it will wait for updates to the target
// transaction.
func (s *Store) maybeWaitInPushTxnQueue(
ctx context.Context, ba *roachpb.BatchRequest, repl *Replica,
) (*roachpb.BatchResponse, *roachpb.Error) {
// If this is a push txn request, check the push queue first, which
// may cause this request to wait and either return a successful push
// txn response or else allow this request to proceed.
if ba.IsSinglePushTxnRequest() {
pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest)
pushResp, pErr := repl.pushTxnQueue.MaybeWaitForPush(repl.AnnotateCtx(ctx), repl, pushReq)
// Copy the request in anticipation of setting the force arg and
// updating the Now timestamp (see below).
pushReqCopy := *pushReq
if pErr == errDeadlock {
// We've experienced a deadlock; set Force=true on push request.
pushReqCopy.Force = true
} else if pErr != nil {
return nil, pErr
} else if pushResp != nil {
br := &roachpb.BatchResponse{}
br.Add(pushResp)
return br, nil
}
// Move the push timestamp forward to the current time, as this
// request may have been waiting to push the txn. If we don't
// move the timestamp forward to the current time, we may fail
// to push a txn which has expired.
pushReqCopy.Now.Forward(s.Clock().Now())
ba.Requests = nil
ba.Add(&pushReqCopy)
} else if ba.IsSingleQueryTxnRequest() {
// For query txn requests, wait in the push txn queue either for
// transaction update or for dependent transactions to change.
queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest)
pErr := repl.pushTxnQueue.MaybeWaitForQuery(repl.AnnotateCtx(ctx), repl, queryReq)
if pErr != nil {
return nil, pErr
}
}
return nil, nil
}
// reserveSnapshot throttles incoming snapshots. The returned closure is used
// to cleanup the reservation and release its resources. A nil cleanup function
// and a non-empty rejectionMessage indicates the reservation was declined.
func (s *Store) reserveSnapshot(
ctx context.Context, header *SnapshotRequest_Header,
) (_cleanup func(), _rejectionMsg string, _err error) {
if header.RangeSize == 0 {
// Empty snapshots are exempt from rate limits because they're so cheap to
// apply. This vastly speeds up rebalancing any empty ranges created by a
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
// getting stuck behind large snapshots managed by the replicate queue.
} else if header.CanDecline {
if atomic.LoadInt32(&s.rebalancesDisabled) == 1 {
return nil, rebalancesDisabledMsg, nil
}
select {
case s.snapshotApplySem <- struct{}{}:
case <-ctx.Done():
return nil, "", ctx.Err()
case <-s.stopper.ShouldStop():
return nil, "", errors.Errorf("stopped")
default:
return nil, snapshotApplySemBusyMsg, nil
}
} else {
select {
case s.snapshotApplySem <- struct{}{}:
case <-ctx.Done():
return nil, "", ctx.Err()
case <-s.stopper.ShouldStop():
return nil, "", errors.Errorf("stopped")
}
}
s.metrics.ReservedReplicaCount.Inc(1)
s.metrics.Reserved.Inc(header.RangeSize)
return func() {
s.metrics.ReservedReplicaCount.Dec(1)
s.metrics.Reserved.Dec(header.RangeSize)
if header.RangeSize != 0 {
<-s.snapshotApplySem
}
}, "", nil
}
// HandleSnapshot reads an incoming streaming snapshot and applies it if
// possible.
func (s *Store) HandleSnapshot(
header *SnapshotRequest_Header, stream SnapshotResponseStream,
) error {
s.metrics.raftRcvdMessages[raftpb.MsgSnap].Inc(1)
if s.IsDraining() {
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_DECLINED,
Message: storeDrainingMsg,
})
}
ctx := s.AnnotateCtx(stream.Context())
cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header)
if err != nil {
return err
}
if cleanup == nil {
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_DECLINED,
Message: rejectionMsg,
})
}
defer cleanup()
sendSnapError := func(err error) error {
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_ERROR,
Message: err.Error(),
})
}
// Check to see if the snapshot can be applied but don't attempt to add
// a placeholder here, because we're not holding the replica's raftMu.
// We'll perform this check again later after receiving the rest of the
// snapshot data - this is purely an optimization to prevent downloading
// a snapshot that we know we won't be able to apply.
if _, err := s.canApplySnapshot(ctx, header.State.Desc); err != nil {
return sendSnapError(
errors.Wrapf(err, "%s,r%d: cannot apply snapshot", s, header.State.Desc.RangeID),
)
}
if err := stream.Send(&SnapshotResponse{Status: SnapshotResponse_ACCEPTED}); err != nil {
return err
}
if log.V(2) {
log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID)
}
var batches [][]byte
var logEntries [][]byte
for {
req, err := stream.Recv()
if err != nil {
return err
}
if req.Header != nil {
return sendSnapError(errors.New("client error: provided a header mid-stream"))
}
if req.KVBatch != nil {
batches = append(batches, req.KVBatch)
}
if req.LogEntries != nil {
logEntries = append(logEntries, req.LogEntries...)
}
if req.Final {
snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data)
if err != nil {
return sendSnapError(errors.Wrap(err, "invalid snapshot"))
}
inSnap := IncomingSnapshot{
SnapUUID: snapUUID,
Batches: batches,
LogEntries: logEntries,
State: &header.State,
snapType: snapTypeRaft,
}
if header.RaftMessageRequest.ToReplica.ReplicaID == 0 {
inSnap.snapType = snapTypePreemptive
}
if err := s.processRaftRequest(ctx, &header.RaftMessageRequest, inSnap); err != nil {
return sendSnapError(errors.Wrap(err.GoError(), "failed to apply snapshot"))
}
return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED})
}
}
}
func (s *Store) uncoalesceBeats(
ctx context.Context,
beats []RaftHeartbeat,
fromReplica, toReplica roachpb.ReplicaDescriptor,
msgT raftpb.MessageType,
respStream RaftMessageResponseStream,
) {
if len(beats) == 0 {
return
}
if log.V(4) {
log.Infof(ctx, "uncoalescing %d beats of type %v: %+v", len(beats), msgT, beats)
}
beatReqs := make([]RaftMessageRequest, len(beats))
for i, beat := range beats {
msg := raftpb.Message{
Type: msgT,
From: uint64(beat.FromReplicaID),
To: uint64(beat.ToReplicaID),
Term: beat.Term,
Commit: beat.Commit,
}
beatReqs[i] = RaftMessageRequest{
RangeID: beat.RangeID,
FromReplica: roachpb.ReplicaDescriptor{
NodeID: fromReplica.NodeID,
StoreID: fromReplica.StoreID,
ReplicaID: beat.FromReplicaID,
},
ToReplica: roachpb.ReplicaDescriptor{
NodeID: toReplica.NodeID,
StoreID: toReplica.StoreID,
ReplicaID: beat.ToReplicaID,
},
Message: msg,
Quiesce: beat.Quiesce,
}
if log.V(4) {
log.Infof(ctx, "uncoalesced beat: %+v", beatReqs[i])
}
if err := s.HandleRaftUncoalescedRequest(ctx, &beatReqs[i], respStream); err != nil {
log.Errorf(ctx, "could not handle uncoalesced heartbeat %s", err)
}
}
}
// HandleRaftRequest dispatches a raft message to the appropriate Replica. It
// requires that s.mu is not held.
func (s *Store) HandleRaftRequest(
ctx context.Context, req *RaftMessageRequest, respStream RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
if req.RangeID != 0 {
log.Fatalf(ctx, "coalesced heartbeats must have rangeID == 0")
}
s.uncoalesceBeats(ctx, req.Heartbeats, req.FromReplica, req.ToReplica, raftpb.MsgHeartbeat, respStream)
s.uncoalesceBeats(ctx, req.HeartbeatResps, req.FromReplica, req.ToReplica, raftpb.MsgHeartbeatResp, respStream)
return nil
}
return s.HandleRaftUncoalescedRequest(ctx, req, respStream)
}
// HandleRaftUncoalescedRequest dispatches a raft message to the appropriate
// Replica. It requires that s.mu is not held.
func (s *Store) HandleRaftUncoalescedRequest(
ctx context.Context, req *RaftMessageRequest, respStream RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
log.Fatalf(ctx, "HandleRaftUncoalescedRequest cannot be given coalesced heartbeats or heartbeat responses, received %s", req)
}
// HandleRaftRequest is called on locally uncoalesced heartbeats (which are
// not sent over the network if the environment variable is set) so do not
// count them.
s.metrics.raftRcvdMessages[req.Message.Type].Inc(1)
if respStream == nil {
return s.processRaftRequest(ctx, req, IncomingSnapshot{})
}
value, ok := s.replicaQueues.Load(int64(req.RangeID))
if !ok {
value, _ = s.replicaQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftRequestQueue{}))
}
q := (*raftRequestQueue)(value)
q.Lock()
if len(q.infos) >= replicaRequestQueueSize {
q.Unlock()
// TODO(peter): Return an error indicating the request was dropped. Note
// that dropping the request is safe. Raft will retry.
s.metrics.RaftRcvdMsgDropped.Inc(1)
return nil
}
q.infos = append(q.infos, raftRequestInfo{
req: req,
respStream: respStream,
})
q.Unlock()
s.scheduler.EnqueueRaftRequest(req.RangeID)
return nil
}
func (s *Store) processRaftRequest(
ctx context.Context, req *RaftMessageRequest, inSnap IncomingSnapshot,
) (pErr *roachpb.Error) {
// Lazily create the replica.
r, _, err := s.getOrCreateReplica(
ctx,
req.RangeID,
req.ToReplica.ReplicaID,
&req.FromReplica,
)
if err != nil {
return roachpb.NewError(err)
}
ctx = r.AnnotateCtx(ctx)
defer r.raftMu.Unlock()
r.setLastReplicaDescriptors(req)
if req.Quiesce {
if req.Message.Type != raftpb.MsgHeartbeat {
log.Fatalf(ctx, "unexpected quiesce: %+v", req)
}
status := r.RaftStatus()
if status != nil && status.Term == req.Message.Term && status.Commit == req.Message.Commit {
if r.quiesce() {
return
}
}
if log.V(4) {
log.Infof(ctx, "not quiescing: local raft status is %+v, incoming quiesce message is %+v", status, req.Message)
}
}
// Check to see if a snapshot can be applied. Snapshots can always be applied
// to initialized replicas. Note that if we add a placeholder we need to
// already be holding Replica.raftMu in order to prevent concurrent
// raft-ready processing of uninitialized replicas.
var addedPlaceholder bool
var removePlaceholder bool
if req.Message.Type == raftpb.MsgSnap && !r.IsInitialized() {
if earlyReturn := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
placeholder, err := s.canApplySnapshotLocked(ctx, inSnap.State.Desc)
if err != nil {
// If the storage cannot accept the snapshot, drop it before
// passing it to RawNode.Step, since our error handling
// options past that point are limited.
// TODO(arjun): Now that we have better raft transport error
// handling, consider if this error should be returned and
// handled by the sending store.
log.Infof(ctx, "cannot apply snapshot: %s", err)
return true
}
if placeholder != nil {
// NB: The placeholder added here is either removed below after a
// preemptive snapshot is applied or after the next call to
// Replica.handleRaftReady. Note that we can only get here if the
// replica doesn't exist or is uninitialized.
if err := s.addPlaceholderLocked(placeholder); err != nil {
log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err)
}
addedPlaceholder = true
}
return false
}(); earlyReturn {
return nil
}
if addedPlaceholder {
// If we added a placeholder remove it before we return unless some other
// part of the code takes ownership of the removal (indicated by setting
// removePlaceholder to false).
removePlaceholder = true
defer func() {
if removePlaceholder {
if s.removePlaceholder(ctx, req.RangeID) {
atomic.AddInt32(&s.counts.removedPlaceholders, 1)
}
}
}()
}
}
// Snapshots addressed to replica ID 0 are permitted; this is the
// mechanism by which preemptive snapshots work. No other requests to
// replica ID 0 are allowed.
//
// Note that just because the ToReplica's ID is 0 it does not necessarily
// mean that the replica's current ID is 0. We allow for preemptive snaphots
// to be applied to initialized replicas as of #8613.
if req.ToReplica.ReplicaID == 0 {
if req.Message.Type != raftpb.MsgSnap {
log.VEventf(ctx, 1, "refusing incoming Raft message %s from %+v to %+v",
req.Message.Type, req.FromReplica, req.ToReplica)
return roachpb.NewErrorf(
"cannot recreate replica that is not a member of its range (StoreID %s not found in r%d)",
r.store.StoreID(), req.RangeID,
)
}
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
// We need to remove the placeholder regardless of whether the snapshot
// applied successfully or not.
if addedPlaceholder {
// Clear the replica placeholder; we are about to swap it with a real replica.
if !s.removePlaceholderLocked(ctx, req.RangeID) {
log.Fatalf(ctx, "could not remove placeholder after preemptive snapshot")
}
if pErr == nil {
atomic.AddInt32(&s.counts.filledPlaceholders, 1)
} else {
atomic.AddInt32(&s.counts.removedPlaceholders, 1)
}
removePlaceholder = false
}
if pErr == nil {
// If the snapshot succeeded, process the range descriptor update.
if err := s.processRangeDescriptorUpdateLocked(ctx, r); err != nil {
pErr = roachpb.NewError(err)
}
}
}()
// Requiring that the Term is set in a message makes sure that we
// get all of Raft's internal safety checks (it confuses messages
// at term zero for internal messages). The sending side uses the
// term from the snapshot itself, but we'll just check nonzero.
if req.Message.Term == 0 {
return roachpb.NewErrorf(
"preemptive snapshot from term %d received with zero term",
req.Message.Snapshot.Metadata.Term,
)
}
// TODO(tschottdorf): A lot of locking of the individual Replica
// going on below as well. I think that's more easily refactored
// away; what really matters is that the Store doesn't do anything
// else with that same Replica (or one that might conflict with us
// while we still run). In effect, we'd want something like:
//
// 1. look up the snapshot's key range
// 2. get an exclusive lock for operations on that key range from
// the store (or discard the snapshot)
// (at the time of writing, we have checked the key range in
// canApplySnapshotLocked above, but there are concerns about two
// conflicting operations passing that check simultaneously,
// see #7830)
// 3. do everything below (apply the snapshot through temp Raft group)
// 4. release the exclusive lock on the snapshot's key range
//
// There are two future outcomes: Either we begin receiving
// legitimate Raft traffic for this Range (hence learning the
// ReplicaID and becoming a real Replica), or the Replica GC queue
// decides that the ChangeReplicas as a part of which the
// preemptive snapshot was sent has likely failed and removes both
// in-memory and on-disk state.
r.mu.Lock()
// We are paranoid about applying preemptive snapshots (which
// were constructed via our code rather than raft) to the "real"
// raft group. Instead, we destroy the "real" raft group if one
// exists (this is rare in production, although it occurs in
// tests), apply the preemptive snapshot to a temporary raft
// group, then discard that one as well to be replaced by a real
// raft group when we get a new replica ID.
//
// It might be OK instead to apply preemptive snapshots just
// like normal ones (essentially switching between regular and
// preemptive mode based on whether or not we have a raft group,
// instead of based on the replica ID of the snapshot message).
// However, this is a risk that we're not yet willing to take.
// Additionally, without some additional plumbing work, doing so
// would limit the effectiveness of RaftTransport.SendSync for
// preemptive snapshots.
r.mu.internalRaftGroup = nil
needTombstone := r.mu.state.Desc.NextReplicaID != 0
r.mu.Unlock()
appliedIndex, _, err := r.raftMu.stateLoader.loadAppliedIndex(ctx, r.store.Engine())
if err != nil {
return roachpb.NewError(err)
}
raftGroup, err := raft.NewRawNode(
newRaftConfig(
raft.Storage((*replicaRaftStorage)(r)),
preemptiveSnapshotRaftGroupID,
// We pass the "real" applied index here due to subtleties
// arising in the case in which Raft discards the snapshot:
// It would instruct us to apply entries, which would have
// crashing potential for any choice of dummy value below.
appliedIndex,
r.store.cfg,
&raftLogger{ctx: ctx},
), nil)
if err != nil {
return roachpb.NewError(err)
}
// We have a Raft group; feed it the message.
if err := raftGroup.Step(req.Message); err != nil {
return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot"))
}
// In the normal case, the group should ask us to apply a snapshot.
// If it doesn't, our snapshot was probably stale. In that case we
// still go ahead and apply a noop because we want that case to be
// counted by stats as a successful application.
var ready raft.Ready
if raftGroup.HasReady() {
ready = raftGroup.Ready()
}
if needTombstone {
// Bump the min replica ID, but don't write the tombstone key. The
// tombstone key is not expected to be present when normal replica data
// is present and applySnapshot would delete the key in most cases. If
// Raft has decided the snapshot shouldn't be applied we would be
// writing the tombstone key incorrectly.
r.mu.Lock()
r.mu.minReplicaID = r.nextReplicaIDLocked(nil)
r.mu.Unlock()
}
// Apply the snapshot, as Raft told us to.
if err := r.applySnapshot(ctx, inSnap, ready.Snapshot, ready.HardState); err != nil {
return roachpb.NewError(err)
}
// At this point, the Replica has data but no ReplicaID. We hope
// that it turns into a "real" Replica by means of receiving Raft
// messages addressed to it with a ReplicaID, but if that doesn't
// happen, at some point the Replica GC queue will have to grab it.
//
// NB: See the defer at the start of this block for the removal of the
// placeholder and processing of the range descriptor update.
return nil
}
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
// We're processing a message from another replica which means that the
// other replica is not quiesced, so we don't need to wake the leader.
r.unquiesceLocked()
if req.Message.Type == raftpb.MsgApp {
r.setEstimatedCommitIndexLocked(req.Message.Commit)
}
return false, /* !unquiesceAndWakeLeader */
raftGroup.Step(req.Message)
}); err != nil {
return roachpb.NewError(err)
}
if _, expl, err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
// Mimic the behavior in processRaft.
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
}
removePlaceholder = false
return nil
}
// HandleRaftResponse implements the RaftMessageHandler interface.
// It requires that s.mu is not held.
func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageResponse) error {
ctx = s.AnnotateCtx(ctx)
switch val := resp.Union.GetValue().(type) {
case *roachpb.Error:
switch tErr := val.GetDetail().(type) {
case *roachpb.ReplicaTooOldError:
repl, err := s.GetReplica(resp.RangeID)
if err != nil {
// RangeNotFoundErrors are expected here; nothing else is.
if _, ok := err.(*roachpb.RangeNotFoundError); !ok {
log.Error(ctx, err)
}
return nil
}
repl.mu.Lock()
// If the replica ID in the error matches (which is the usual
// case; the exception is when a replica has been removed and
// re-added rapidly), we know the replica will be removed and we
// can cancel any pending commands. This is sometimes necessary
// to unblock PushTxn operations that are necessary for the
// replica GC to succeed.
if tErr.ReplicaID == repl.mu.replicaID {
repl.cancelPendingCommandsLocked()
}
repl.mu.Unlock()
replCtx := repl.AnnotateCtx(ctx)
added, err := s.replicaGCQueue.Add(
repl, replicaGCPriorityRemoved,
)
if err != nil {
log.Errorf(replCtx, "unable to add to replica GC queue: %s", err)
} else if added {
log.Infof(replCtx, "added to replica GC queue (peer suggestion)")
}
case *roachpb.StoreNotFoundError:
log.Warningf(ctx, "raft error: node %d claims to not contain store %d for replica %s: %s",
resp.FromReplica.NodeID, resp.FromReplica.StoreID, resp.FromReplica, val)
return val.GetDetail()
default:
log.Warningf(ctx, "got error from r%d, replica %s: %s",
resp.RangeID, resp.FromReplica, val)
}
default:
log.Infof(ctx, "got unknown raft response type %T from replica %s: %s", val, resp.FromReplica, val)
}
return nil
}
// OutgoingSnapshotStream is the minimal interface on a GRPC stream required
// to send a snapshot over the network.
type OutgoingSnapshotStream interface {
Send(*SnapshotRequest) error
Recv() (*SnapshotResponse, error)
}
// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test.
type SnapshotStorePool interface {
throttle(reason throttleReason, toStoreID roachpb.StoreID)
}
var rebalanceSnapshotRate = settings.RegisterByteSizeSetting(
"kv.snapshot_rebalance.max_rate",
"the rate limit (bytes/sec) to use for rebalance snapshots",
envutil.EnvOrDefaultBytes("COCKROACH_PREEMPTIVE_SNAPSHOT_RATE", 2<<20),
)
var recoverySnapshotRate = settings.RegisterByteSizeSetting(
"kv.snapshot_recovery.max_rate",
"the rate limit (bytes/sec) to use for recovery snapshots",
envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20),
)
func snapshotRateLimit(
st *cluster.Settings, priority SnapshotRequest_Priority,
) (rate.Limit, error) {
switch priority {
case SnapshotRequest_RECOVERY:
return rate.Limit(recoverySnapshotRate.Get(&st.SV)), nil
case SnapshotRequest_REBALANCE:
return rate.Limit(rebalanceSnapshotRate.Get(&st.SV)), nil
default:
return 0, errors.Errorf("unknown snapshot priority: %s", priority)
}
}
type errMustRetrySnapshotDueToTruncation struct {
index, term uint64
}
func (e *errMustRetrySnapshotDueToTruncation) Error() string {
return fmt.Sprintf(
"log truncation during snapshot removed sideloaded SSTable at index %d, term %d",
e.index, e.term,
)
}
// sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream.
func sendSnapshot(
ctx context.Context,
st *cluster.Settings,
stream OutgoingSnapshotStream,
storePool SnapshotStorePool,
header SnapshotRequest_Header,
snap *OutgoingSnapshot,
newBatch func() engine.Batch,
sent func(),
) error {
start := timeutil.Now()
to := header.RaftMessageRequest.ToReplica
if err := stream.Send(&SnapshotRequest{Header: &header}); err != nil {
return err
}
// Wait until we get a response from the server.
resp, err := stream.Recv()
if err != nil {
storePool.throttle(throttleFailed, to.StoreID)
return err
}
switch resp.Status {
case SnapshotResponse_DECLINED:
if header.CanDecline {
storePool.throttle(throttleDeclined, to.StoreID)
declinedMsg := "reservation rejected"
if len(resp.Message) > 0 {
declinedMsg = resp.Message
}
return errors.Errorf("%s: remote declined snapshot: %s", to, declinedMsg)
}
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: programming error: remote declined required snapshot: %s",
to, resp.Message)
case SnapshotResponse_ERROR:
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: remote couldn't accept snapshot with error: %s",
to, resp.Message)
case SnapshotResponse_ACCEPTED:
// This is the response we're expecting. Continue with snapshot sending.
default:
storePool.throttle(throttleFailed, to.StoreID)
return errors.Errorf("%s: server sent an invalid status during negotiation: %s",
to, resp.Status)
}
// The size of batches to send. This is the granularity of rate limiting.
const batchSize = 256 << 10 // 256 KB
targetRate, err := snapshotRateLimit(st, header.Priority)
if err != nil {
return errors.Wrapf(err, "%s", to)
}
// Convert the bytes/sec rate limit to batches/sec.
//
// TODO(peter): Using bytes/sec for rate limiting seems more natural but has
// practical difficulties. We either need to use a very large burst size
// which seems to disable the rate limiting, or call WaitN in smaller than
// burst size chunks which caused excessive slowness in testing. Would be
// nice to figure this out, but the batches/sec rate limit works for now.
limiter := rate.NewLimiter(targetRate/batchSize, 1 /* burst size */)
// Determine the unreplicated key prefix so we can drop any
// unreplicated keys from the snapshot.
unreplicatedPrefix := keys.MakeRangeIDUnreplicatedPrefix(header.State.Desc.RangeID)
var alloc bufalloc.ByteAllocator
n := 0
var b engine.Batch
for ; ; snap.Iter.Next() {
if ok, err := snap.Iter.Valid(); err != nil {
return err
} else if !ok {
break
}
var key engine.MVCCKey
var value []byte
alloc, key, value = snap.Iter.allocIterKeyValue(alloc)
if bytes.HasPrefix(key.Key, unreplicatedPrefix) {
continue
}
n++
mvccKey := engine.MVCCKey{
Key: key.Key,
Timestamp: key.Timestamp,
}
if b == nil {
b = newBatch()
}
if err := b.Put(mvccKey, value); err != nil {
b.Close()
return err
}
if len(b.Repr()) >= batchSize {
if err := limiter.WaitN(ctx, 1); err != nil {
return err
}
if err := sendBatch(stream, b); err != nil {
return err
}
b = nil
// We no longer need the keys and values in the batch we just sent,
// so reset alloc and allow them to be garbage collected.
alloc = bufalloc.ByteAllocator{}
}
}
if b != nil {
if err := limiter.WaitN(ctx, 1); err != nil {
return err
}
if err := sendBatch(stream, b); err != nil {
return err
}
}
firstIndex := header.State.TruncatedState.Index + 1
endIndex := snap.RaftSnap.Metadata.Index + 1
logEntries := make([][]byte, 0, endIndex-firstIndex)
scanFunc := func(kv roachpb.KeyValue) (bool, error) {
bytes, err := kv.Value.GetBytes()
if err == nil {
logEntries = append(logEntries, bytes)
}
return false, err
}
rangeID := header.State.Desc.RangeID
if err := iterateEntries(ctx, snap.EngineSnap, rangeID, firstIndex, endIndex, scanFunc); err != nil {
return err
}
// Inline the payloads for all sideloaded proposals.
//
// TODO(tschottdorf): could also send slim proposals and attach sideloaded
// SSTables directly to the snapshot. Probably the better long-term
// solution, but let's see if it ever becomes relevant. Snapshots with
// inlined proposals are hopefully the exception.
{
var ent raftpb.Entry
for i := range logEntries {
ent.Reset()
if err := ent.Unmarshal(logEntries[i]); err != nil {
return err
}
if !sniffSideloadedRaftCommand(ent.Data) {
continue
}
if err := snap.WithSideloaded(func(ss sideloadStorage) error {
newEnt, err := maybeInlineSideloadedRaftCommand(
ctx, rangeID, ent, ss, snap.RaftEntryCache,
)
if err != nil {
return err
}
if newEnt != nil {
ent = *newEnt
}
return nil
}); err != nil {
if errors.Cause(err) == errSideloadedFileNotFound {
// We're creating the Raft snapshot based on a snapshot of
// the engine, but the Raft log may since have been
// truncated and corresponding on-disk sideloaded payloads
// unlinked. Luckily, we can just abort this snapshot; the
// caller can retry.
//
// TODO(tschottdorf): check how callers handle this. They
// should simply retry. In some scenarios, perhaps this can
// happen repeatedly and prevent a snapshot; not sending the
// log entries wouldn't help, though, and so we'd really
// need to make sure the entries are always here, for
// instance by pre-loading them into memory. Or we can make
// log truncation less aggressive about removing sideloaded
// files, by delaying trailing file deletion for a bit.
return &errMustRetrySnapshotDueToTruncation{
index: ent.Index,
term: ent.Term,
}
}
return err
}
// TODO(tschottdorf): it should be possible to reuse `logEntries[i]` here.
var err error
if logEntries[i], err = ent.Marshal(); err != nil {
return err
}
}
}
req := &SnapshotRequest{
LogEntries: logEntries,
Final: true,
}
// Notify the sent callback before the final snapshot request is sent so that
// the snapshots generated metric gets incremented before the snapshot is
// applied.
sent()
if err := stream.Send(req); err != nil {
return err
}
log.Infof(ctx, "streamed snapshot to %s: kv pairs: %d, log entries: %d, rate-limit: %s/sec, %0.0fms",
to, n, len(logEntries), humanizeutil.IBytes(int64(targetRate)),
timeutil.Since(start).Seconds()*1000)
resp, err = stream.Recv()
if err != nil {
return errors.Wrapf(err, "%s: remote failed to apply snapshot", to)
}
// NB: wait for EOF which ensures that all processing on the server side has
// completed (such as defers that might be run after the previous message was
// received).
if unexpectedResp, err := stream.Recv(); err != io.EOF {
return errors.Errorf("%s: expected EOF, got resp=%v err=%v", to, unexpectedResp, err)
}
switch resp.Status {
case SnapshotResponse_ERROR:
return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.Message)
case SnapshotResponse_APPLIED:
return nil
default:
return errors.Errorf("%s: server sent an invalid status during finalization: %s",
to, resp.Status)
}
}
func sendBatch(stream OutgoingSnapshotStream, batch engine.Batch) error {
repr := batch.Repr()
batch.Close()
return stream.Send(&SnapshotRequest{KVBatch: repr})
}
// enqueueRaftUpdateCheck asynchronously registers the given range ID to be
// checked for raft updates when the processRaft goroutine is idle.
func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) {
s.scheduler.EnqueueRaftReady(rangeID)
}
func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) {
value, ok := s.replicaQueues.Load(int64(rangeID))
if !ok {
return
}
q := (*raftRequestQueue)(value)
q.Lock()
infos := q.infos
q.infos = nil
q.Unlock()
for _, info := range infos {
if pErr := s.processRaftRequest(info.respStream.Context(), info.req, IncomingSnapshot{}); pErr != nil {
// If we're unable to process the request, clear the request queue. This
// only happens if we couldn't create the replica because the request was
// targeted to a removed range. This is also racy and could cause us to
// drop messages to the deleted range occasionally (#18355), but raft
// will just retry.
q.Lock()
if len(q.infos) == 0 {
s.replicaQueues.Delete(int64(rangeID))
}
q.Unlock()
if err := info.respStream.Send(newRaftMessageResponse(info.req, pErr)); err != nil {
// Seems excessive to log this on every occurrence as the other side
// might have closed.
log.VEventf(ctx, 1, "error sending error: %s", err)
}
}
}
}
func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
value, ok := s.mu.replicas.Load(int64(rangeID))
if !ok {
return
}
start := timeutil.Now()
r := (*Replica)(value)
stats, expl, err := r.handleRaftReady(IncomingSnapshot{})
if err != nil {
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
}
elapsed := timeutil.Since(start)
s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds())
// Warn if Raft processing took too long. We use the same duration as we
// use for warning about excessive raft mutex lock hold times. Long
// processing time means we'll have starved local replicas of ticks and
// remote replicas will likely start campaigning.
if elapsed >= defaultReplicaRaftMuWarnThreshold {
log.Warningf(ctx, "handle raft ready: %.1fs [processed=%d]",
elapsed.Seconds(), stats.processed)
}
if !r.IsInitialized() {
// Only an uninitialized replica can have a placeholder since, by
// definition, an initialized replica will be present in the
// replicasByKey map. While the replica will usually consume the
// placeholder itself, that isn't guaranteed and so this invocation
// here is crucial (i.e. don't remove it).
//
// We need to hold raftMu here to prevent removing a placeholder that is
// actively being used by Store.processRaftRequest.
r.raftMu.Lock()
if s.removePlaceholder(ctx, r.RangeID) {
atomic.AddInt32(&s.counts.droppedPlaceholders, 1)
}
r.raftMu.Unlock()
}
}
func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool {
value, ok := s.mu.replicas.Load(int64(rangeID))
if !ok {
return false
}
start := timeutil.Now()
r := (*Replica)(value)
exists, err := r.tick()
if err != nil {
log.Error(ctx, err)
}
s.metrics.RaftTickingDurationNanos.Inc(timeutil.Since(start).Nanoseconds())
return exists // ready
}
func (s *Store) processRaft(ctx context.Context) {
if s.cfg.TestingKnobs.DisableProcessRaft {
return
}
s.scheduler.Start(ctx, s.stopper)
// Wait for the scheduler worker goroutines to finish.
s.stopper.RunWorker(ctx, s.scheduler.Wait)
s.stopper.RunWorker(ctx, s.raftTickLoop)
s.stopper.RunWorker(ctx, s.coalescedHeartbeatsLoop)
s.stopper.AddCloser(stop.CloserFn(func() {
s.cfg.Transport.Stop(s.StoreID())
}))
}
func (s *Store) raftTickLoop(ctx context.Context) {
ticker := time.NewTicker(s.cfg.RaftTickInterval)
defer ticker.Stop()
var rangeIDs []roachpb.RangeID
for {
select {
case <-ticker.C:
rangeIDs = rangeIDs[:0]
s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
// Fast-path handling of quiesced replicas. This avoids the overhead of
// queueing the replica on the Raft scheduler. This overhead is
// significant and there is overhead to filling the Raft scheduler with
// replicas to tick. A node with 3TB of disk might contain 50k+
// replicas. Filling the Raft scheduler with all of those replicas
// every tick interval can starve other Raft processing of cycles.
//
// Why do we bother to ever queue a Replica on the Raft scheduler for
// tick processing? Couldn't we just call Replica.tick() here? Yes, but
// then a single bad/slow Replica can disrupt tick processing for every
// Replica on the store which cascades into Raft elections and more
// disruption. Replica.maybeTickQuiesced only grabs short-duration
// locks and not locks that are held during disk I/O.
if !(*Replica)(v).maybeTickQuiesced() {
rangeIDs = append(rangeIDs, roachpb.RangeID(k))
}
return true
})
s.scheduler.EnqueueRaftTick(rangeIDs...)
s.metrics.RaftTicks.Inc(1)
case <-s.stopper.ShouldStop():
return
}
}
}
// Since coalesced heartbeats adds latency to heartbeat messages, it is
// beneficial to have it run on a faster cycle than once per tick, so that
// the delay does not impact latency-sensitive features such as quiescence.
func (s *Store) coalescedHeartbeatsLoop(ctx context.Context) {
ticker := time.NewTicker(s.cfg.CoalescedHeartbeatsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.sendQueuedHeartbeats(ctx)
case <-s.stopper.ShouldStop():
return
}
}
}
// sendQueuedHeartbeatsToNode requires that the s.coalescedMu lock is held. It
// returns the number of heartbeats that were sent.
func (s *Store) sendQueuedHeartbeatsToNode(
ctx context.Context, beats, resps []RaftHeartbeat, to roachpb.StoreIdent,
) int {
var msgType raftpb.MessageType
if len(beats) == 0 && len(resps) == 0 {
return 0
} else if len(resps) == 0 {
msgType = raftpb.MsgHeartbeat
} else if len(beats) == 0 {
msgType = raftpb.MsgHeartbeatResp
} else {
log.Fatal(ctx, "cannot coalesce both heartbeats and responses")
}
chReq := &RaftMessageRequest{
RangeID: 0,
ToReplica: roachpb.ReplicaDescriptor{
NodeID: to.NodeID,
StoreID: to.StoreID,
ReplicaID: 0,
},
FromReplica: roachpb.ReplicaDescriptor{
NodeID: s.Ident.NodeID,
StoreID: s.Ident.StoreID,
},
Message: raftpb.Message{
Type: msgType,
},
Heartbeats: beats,
HeartbeatResps: resps,
}
if log.V(4) {
log.Infof(ctx, "sending raft request (coalesced) %+v", chReq)
}
if !s.cfg.Transport.SendAsync(chReq) {
for _, beat := range beats {
if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID)
}
}
for _, resp := range resps {
if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID)
}
}
return 0
}
return len(beats) + len(resps)
}
func (s *Store) sendQueuedHeartbeats(ctx context.Context) {
s.coalescedMu.Lock()
heartbeats := s.coalescedMu.heartbeats
heartbeatResponses := s.coalescedMu.heartbeatResponses
s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]RaftHeartbeat{}
s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]RaftHeartbeat{}
s.coalescedMu.Unlock()
var beatsSent int
for to, beats := range heartbeats {
beatsSent += s.sendQueuedHeartbeatsToNode(ctx, beats, nil, to)
}
for to, resps := range heartbeatResponses {
beatsSent += s.sendQueuedHeartbeatsToNode(ctx, nil, resps, to)
}
s.metrics.RaftCoalescedHeartbeatsPending.Update(int64(beatsSent))
}
var errRetry = errors.New("retry: orphaned replica")
// getOrCreateReplica returns a replica for the given RangeID, creating an
// uninitialized replica if necessary. The caller must not hold the store's
// lock. The returned replica has Replica.raftMu locked and it is the caller's
// responsibility to unlock it.
func (s *Store) getOrCreateReplica(
ctx context.Context,
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
creatingReplica *roachpb.ReplicaDescriptor,
) (_ *Replica, created bool, _ error) {
for {
r, created, err := s.tryGetOrCreateReplica(
ctx,
rangeID,
replicaID,
creatingReplica,
)
if err == errRetry {
continue
}
if err != nil {
return nil, false, err
}
return r, created, err
}
}
// tryGetOrCreateReplica performs a single attempt at trying to lookup or
// create a replica. It will fail with errRetry if it finds a Replica that has
// been destroyed (and is no longer in Store.mu.replicas) or if during creation
// another goroutine gets there first. In either case, a subsequent call to
// tryGetOrCreateReplica will likely succeed, hence the loop in
// getOrCreateReplica.
func (s *Store) tryGetOrCreateReplica(
ctx context.Context,
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
creatingReplica *roachpb.ReplicaDescriptor,
) (_ *Replica, created bool, _ error) {
// The common case: look up an existing (initialized) replica.
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
repl := (*Replica)(value)
if creatingReplica != nil {
// Drop messages that come from a node that we believe was once a member of
// the group but has been removed.
desc := repl.Desc()
_, found := desc.GetReplicaDescriptorByID(creatingReplica.ReplicaID)
// It's not a current member of the group. Is it from the past?
if !found && creatingReplica.ReplicaID < desc.NextReplicaID {
return nil, false, roachpb.NewReplicaTooOldError(creatingReplica.ReplicaID)
}
}
repl.raftMu.Lock()
repl.mu.RLock()
destroyed, corrupted := repl.mu.destroyed, repl.mu.corrupted
repl.mu.RUnlock()
if destroyed != nil {
repl.raftMu.Unlock()
if corrupted {
return nil, false, destroyed
}
return nil, false, errRetry
}
repl.mu.Lock()
if err := repl.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil {
repl.mu.Unlock()
repl.raftMu.Unlock()
return nil, false, err
}
repl.mu.Unlock()
return repl, false, nil
}
// No replica currently exists, so we'll try to create one. Before creating
// the replica, see if there is a tombstone which would indicate that this is
// a stale message.
tombstoneKey := keys.RaftTombstoneKey(rangeID)
var tombstone roachpb.RaftTombstone
if ok, err := engine.MVCCGetProto(
ctx, s.Engine(), tombstoneKey, hlc.Timestamp{}, true, nil, &tombstone,
); err != nil {
return nil, false, err
} else if ok {
if replicaID != 0 && replicaID < tombstone.NextReplicaID {
return nil, false, &roachpb.RaftGroupDeletedError{}
}
}
// Create a new replica and lock it for raft processing.
repl := newReplica(rangeID, s)
repl.creatingReplica = creatingReplica
repl.raftMu.Lock()
// Install the replica in the store's replica map. The replica is in an
// inconsistent state, but nobody will be accessing it while we hold its
// locks.
s.mu.Lock()
// Grab the internal Replica state lock to ensure nobody mucks with our
// replica even outside of raft processing. Have to do this after grabbing
// Store.mu to maintain lock ordering invariant.
repl.mu.Lock()
repl.mu.minReplicaID = tombstone.NextReplicaID
// Add the range to range map, but not replicasByKey since the range's start
// key is unknown. The range will be added to replicasByKey later when a
// snapshot is applied. After unlocking Store.mu above, another goroutine
// might have snuck in and created the replica, so we retry on error.
if err := s.addReplicaToRangeMapLocked(repl); err != nil {
repl.mu.Unlock()
s.mu.Unlock()
repl.raftMu.Unlock()
return nil, false, errRetry
}
s.mu.uninitReplicas[repl.RangeID] = repl
s.mu.Unlock()
desc := &roachpb.RangeDescriptor{
RangeID: rangeID,
// TODO(bdarnell): other fields are unknown; need to populate them from
// snapshot.
}
if err := repl.initRaftMuLockedReplicaMuLocked(desc, s.Clock(), replicaID); err != nil {
// Mark the replica as destroyed and remove it from the replicas maps to
// ensure nobody tries to use it
repl.mu.destroyed = errors.Wrapf(err, "%s: failed to initialize", repl)
repl.mu.Unlock()
s.mu.Lock()
s.mu.replicas.Delete(int64(rangeID))
delete(s.mu.uninitReplicas, rangeID)
s.replicaQueues.Delete(int64(rangeID))
s.mu.Unlock()
repl.raftMu.Unlock()
return nil, false, err
}
repl.mu.Unlock()
return repl, true, nil
}
// canApplySnapshot returns (_, nil) if the snapshot can be applied to
// this store's replica (i.e. the snapshot is not from an older incarnation of
// the replica) and a placeholder can be added to the replicasByKey map (if
// necessary). If a placeholder is required, it is returned as the first value.
func (s *Store) canApplySnapshot(
ctx context.Context, rangeDescriptor *roachpb.RangeDescriptor,
) (*ReplicaPlaceholder, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.canApplySnapshotLocked(ctx, rangeDescriptor)
}
func (s *Store) canApplySnapshotLocked(
ctx context.Context, rangeDescriptor *roachpb.RangeDescriptor,
) (*ReplicaPlaceholder, error) {
if v, ok := s.mu.replicas.Load(int64(rangeDescriptor.RangeID)); ok &&
(*Replica)(v).IsInitialized() {
// We have the range and it's initialized, so let the snapshot through.
return nil, nil
}
// We don't have the range (or we have an uninitialized
// placeholder). Will we be able to create/initialize it?
if exRng, ok := s.mu.replicaPlaceholders[rangeDescriptor.RangeID]; ok {
return nil, errors.Errorf("%s: canApplySnapshotLocked: cannot add placeholder, have an existing placeholder %s", s, exRng)
}
if exRange := s.getOverlappingKeyRangeLocked(rangeDescriptor); exRange != nil {
// We have a conflicting range, so we must block the snapshot.
// When such a conflict exists, it will be resolved by one range
// either being split or garbage collected.
exReplica, err := s.GetReplica(exRange.Desc().RangeID)
msg := IntersectingSnapshotMsg
if err != nil {
log.Warning(ctx, errors.Wrapf(
err, "unable to look up overlapping replica on %s", exReplica))
} else {
inactive := func(r *Replica) bool {
if r.RaftStatus() == nil {
return true
}
lease, pendingLease := r.getLease()
now := s.Clock().Now()
return !r.IsLeaseValid(lease, now) &&
(pendingLease == nil || !r.IsLeaseValid(*pendingLease, now))
}
// If the existing range shows no signs of recent activity, give it a GC
// run.
if inactive(exReplica) {
if _, err := s.replicaGCQueue.Add(exReplica, replicaGCPriorityCandidate); err != nil {
log.Errorf(ctx, "%s: unable to add replica to GC queue: %s", exReplica, err)
} else {
msg += "; initiated GC:"
}
}
}
return nil, errors.Errorf("%s %v", msg, exReplica) // exReplica can be nil
}
placeholder := &ReplicaPlaceholder{
rangeDesc: *rangeDescriptor,
}
return placeholder, nil
}
func (s *Store) updateCapacityGauges() error {
desc, err := s.Descriptor()
if err != nil {
return err
}
s.metrics.Capacity.Update(desc.Capacity.Capacity)
s.metrics.Available.Update(desc.Capacity.Available)
s.metrics.Used.Update(desc.Capacity.Used)
return nil
}
// updateReplicationGauges counts a number of simple replication statistics for
// the ranges in this store.
// TODO(bram): #4564 It may be appropriate to compute these statistics while
// scanning ranges. An ideal solution would be to create incremental events
// whenever availability changes.
func (s *Store) updateReplicationGauges(ctx context.Context) error {
// Load the system config.
cfg, ok := s.Gossip().GetSystemConfig()
if !ok {
return errors.Errorf("%s: system config not yet available", s)
}
var (
raftLeaderCount int64
leaseHolderCount int64
leaseExpirationCount int64
leaseEpochCount int64
raftLeaderNotLeaseHolderCount int64
quiescentCount int64
averageWritesPerSecond float64
rangeCount int64
unavailableRangeCount int64
underreplicatedRangeCount int64
behindCount int64
selfBehindCount int64
)
timestamp := s.cfg.Clock.Now()
var livenessMap map[roachpb.NodeID]bool
if s.cfg.NodeLiveness != nil {
livenessMap = s.cfg.NodeLiveness.GetIsLiveMap()
}
newStoreReplicaVisitor(s).Visit(func(rep *Replica) bool {
metrics := rep.Metrics(ctx, timestamp, cfg, livenessMap)
if metrics.Leader {
raftLeaderCount++
if metrics.LeaseValid && !metrics.Leaseholder {
raftLeaderNotLeaseHolderCount++
}
}
if metrics.Leaseholder {
leaseHolderCount++
switch metrics.LeaseType {
case roachpb.LeaseNone:
case roachpb.LeaseExpiration:
leaseExpirationCount++
case roachpb.LeaseEpoch:
leaseEpochCount++
}
}
if metrics.Quiescent {
quiescentCount++
}
if metrics.RangeCounter {
rangeCount++
if metrics.Unavailable {
unavailableRangeCount++
}
if metrics.Underreplicated {
underreplicatedRangeCount++
}
}
behindCount += metrics.BehindCount
selfBehindCount += metrics.SelfBehindCount
if qps, dur := rep.writeStats.avgQPS(); dur >= MinStatsDuration {
averageWritesPerSecond += qps
}
return true // more
})
s.metrics.RaftLeaderCount.Update(raftLeaderCount)
s.metrics.RaftLeaderNotLeaseHolderCount.Update(raftLeaderNotLeaseHolderCount)
s.metrics.LeaseHolderCount.Update(leaseHolderCount)
s.metrics.LeaseExpirationCount.Update(leaseExpirationCount)
s.metrics.LeaseEpochCount.Update(leaseEpochCount)
s.metrics.QuiescentCount.Update(quiescentCount)
s.metrics.AverageWritesPerSecond.Update(averageWritesPerSecond)
s.recordNewWritesPerSecond(averageWritesPerSecond)
s.metrics.RangeCount.Update(rangeCount)
s.metrics.UnavailableRangeCount.Update(unavailableRangeCount)
s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount)
s.metrics.RaftLogFollowerBehindCount.Update(behindCount)
s.metrics.RaftLogSelfBehindCount.Update(selfBehindCount)
if selfBehindCount > prohibitRebalancesBehindThreshold {
atomic.StoreInt32(&s.rebalancesDisabled, 1)
} else {
atomic.StoreInt32(&s.rebalancesDisabled, 0)
}
return nil
}
// updateCommandQueueGauges updates a number of simple statistics for
// the CommandQueues of each replica in this store.
func (s *Store) updateCommandQueueGauges() error {
var (
maxCommandQueueSize int64
maxCommandQueueWriteCount int64
maxCommandQueueReadCount int64
maxCommandQueueTreeSize int64
maxCommandQueueOverlaps int64
combinedCommandQueueSize int64
combinedCommandWriteCount int64
combinedCommandReadCount int64
)
newStoreReplicaVisitor(s).Visit(func(rep *Replica) bool {
rep.cmdQMu.Lock()
writes := rep.cmdQMu.queues[spanGlobal].localMetrics.writeCommands
writes += rep.cmdQMu.queues[spanLocal].localMetrics.writeCommands
reads := rep.cmdQMu.queues[spanGlobal].localMetrics.readCommands
reads += rep.cmdQMu.queues[spanLocal].localMetrics.readCommands
treeSize := int64(rep.cmdQMu.queues[spanGlobal].treeSize())
treeSize += int64(rep.cmdQMu.queues[spanLocal].treeSize())
maxOverlaps := rep.cmdQMu.queues[spanGlobal].localMetrics.maxOverlapsSeen
if locMax := rep.cmdQMu.queues[spanLocal].localMetrics.maxOverlapsSeen; locMax > maxOverlaps {
maxOverlaps = locMax
}
rep.cmdQMu.queues[spanGlobal].localMetrics.maxOverlapsSeen = 0
rep.cmdQMu.queues[spanLocal].localMetrics.maxOverlapsSeen = 0
rep.cmdQMu.Unlock()
cqSize := writes + reads
if cqSize > maxCommandQueueSize {
maxCommandQueueSize = cqSize
}
if writes > maxCommandQueueWriteCount {
maxCommandQueueWriteCount = writes
}
if reads > maxCommandQueueReadCount {
maxCommandQueueReadCount = reads
}
if treeSize > maxCommandQueueTreeSize {
maxCommandQueueTreeSize = treeSize
}
if maxOverlaps > maxCommandQueueOverlaps {
maxCommandQueueOverlaps = maxOverlaps
}
combinedCommandQueueSize += cqSize
combinedCommandWriteCount += writes
combinedCommandReadCount += reads
return true // more
})
s.metrics.MaxCommandQueueSize.Update(maxCommandQueueSize)
s.metrics.MaxCommandQueueWriteCount.Update(maxCommandQueueWriteCount)
s.metrics.MaxCommandQueueReadCount.Update(maxCommandQueueReadCount)
s.metrics.MaxCommandQueueTreeSize.Update(maxCommandQueueTreeSize)
s.metrics.MaxCommandQueueOverlaps.Update(maxCommandQueueOverlaps)
s.metrics.CombinedCommandQueueSize.Update(combinedCommandQueueSize)
s.metrics.CombinedCommandWriteCount.Update(combinedCommandWriteCount)
s.metrics.CombinedCommandReadCount.Update(combinedCommandReadCount)
return nil
}
// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
ctx = s.AnnotateCtx(ctx)
if err := s.updateCapacityGauges(); err != nil {
return err
}
if err := s.updateReplicationGauges(ctx); err != nil {
return err
}
if err := s.updateCommandQueueGauges(); err != nil {
return err
}
// Get the latest RocksDB stats.
stats, err := s.engine.GetStats()
if err != nil {
return err
}
s.metrics.updateRocksDBStats(*stats)
// If we're using RocksDB, log the sstable overview.
if rocksdb, ok := s.engine.(*engine.RocksDB); ok {
sstables := rocksdb.GetSSTables()
s.metrics.RdbNumSSTables.Update(int64(sstables.Len()))
readAmp := sstables.ReadAmplification()
s.metrics.RdbReadAmplification.Update(int64(readAmp))
// Log this metric infrequently.
if tick%60 == 0 /* every 10m */ {
log.Infof(ctx, "sstables (read amplification = %d):\n%s", readAmp, sstables)
log.Info(ctx, rocksdb.GetCompactionStats())
}
}
return nil
}
// ComputeStatsForKeySpan computes the aggregated MVCCStats for all replicas on
// this store which contain any keys in the supplied range.
func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (enginepb.MVCCStats, int) {
var output enginepb.MVCCStats
var count int
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
desc := repl.Desc()
if bytes.Compare(startKey, desc.EndKey) >= 0 || bytes.Compare(desc.StartKey, endKey) >= 0 {
return true // continue
}
output.Add(repl.GetMVCCStats())
count++
return true
})
return output, count
}
// AllocatorDryRun runs the given replica through the allocator without actually
// carrying out any changes, returning all trace messages collected along the way.
// Intended to help power a debug endpoint.
func (s *Store) AllocatorDryRun(
ctx context.Context, repl *Replica,
) ([]tracing.RecordedSpan, error) {
sysCfg, ok := s.cfg.Gossip.GetSystemConfig()
if !ok {
return nil, errors.New("allocator dry runs require a valid system config")
}
ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "allocator dry run")
defer cancel()
canTransferLease := func() bool { return true }
_, err := s.replicateQueue.processOneChange(
ctx, repl, sysCfg, canTransferLease, true /* dryRun */)
if err != nil {
log.Eventf(ctx, "error simulating allocator on replica %s: %s", repl, err)
}
return collect(), nil
}
// WriteClusterVersion writes the given cluster version to the store-local cluster version key.
func WriteClusterVersion(
ctx context.Context, writer engine.ReadWriter, cv cluster.ClusterVersion,
) error {
return engine.MVCCPutProto(ctx, writer, nil, keys.StoreClusterVersionKey(), hlc.Timestamp{}, nil, &cv)
}
// ReadClusterVersion reads the the cluster version from the store-local version key.
func ReadClusterVersion(ctx context.Context, reader engine.Reader) (cluster.ClusterVersion, error) {
var cv cluster.ClusterVersion
_, err := engine.MVCCGetProto(ctx, reader, keys.StoreClusterVersionKey(), hlc.Timestamp{}, true, nil, &cv)
return cv, err
}
// The methods below can be used to control a store's queues. Stopping a queue
// is only meant to happen in tests.
func (s *Store) setRaftLogQueueActive(active bool) {
s.raftLogQueue.SetDisabled(!active)
}
func (s *Store) setReplicaGCQueueActive(active bool) {
s.replicaGCQueue.SetDisabled(!active)
}
func (s *Store) setReplicateQueueActive(active bool) {
s.replicateQueue.SetDisabled(!active)
}
func (s *Store) setSplitQueueActive(active bool) {
s.splitQueue.SetDisabled(!active)
}
func (s *Store) setTimeSeriesMaintenanceQueueActive(active bool) {
s.tsMaintenanceQueue.SetDisabled(!active)
}
func (s *Store) setRaftSnapshotQueueActive(active bool) {
s.raftSnapshotQueue.SetDisabled(!active)
}
func (s *Store) setScannerActive(active bool) {
s.scanner.SetDisabled(!active)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.1.4

搜索帮助