2 Star 2 Fork 1

cockroachdb / cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
allocator_scorer.go 33.76 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"bytes"
"fmt"
"math"
"sort"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
const (
// The number of random candidates to select from a larger list of possible
// candidates. Because the allocator heuristics are being run on every node it
// is actually not desirable to set this value higher. Doing so can lead to
// situations where the allocator determistically selects the "best" node for a
// decision and all of the nodes pile on allocations to that node. See "power
// of two random choices":
// https://brooker.co.za/blog/2012/01/17/two-random.html and
// https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf.
allocatorRandomCount = 2
)
// EnableStatsBasedRebalancing controls whether range rebalancing takes
// additional variables such as write load and disk usage into account.
// If disabled, rebalancing is done purely based on replica count.
var EnableStatsBasedRebalancing = settings.RegisterBoolSetting(
"kv.allocator.stat_based_rebalancing.enabled",
"set to enable rebalancing of range replicas based on write load and disk usage",
false,
)
// rangeRebalanceThreshold is the minimum ratio of a store's range count to
// the mean range count at which that store is considered overfull or underfull
// of ranges.
var rangeRebalanceThreshold = settings.RegisterNonNegativeFloatSetting(
"kv.allocator.range_rebalance_threshold",
"minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull",
0.05,
)
// statRebalanceThreshold is the same as rangeRebalanceThreshold, but for
// statistics other than range count. This should be larger than
// rangeRebalanceThreshold because certain stats (like keys written per second)
// are inherently less stable and thus we need to be a little more forgiving to
// avoid thrashing.
//
// Note that there isn't a ton of science behind this number, but setting it
// to .05 and .1 were shown to cause some instability in clusters without load
// on them.
//
// TODO(a-robinson): Should disk usage be held to a higher standard than this?
var statRebalanceThreshold = settings.RegisterNonNegativeFloatSetting(
"kv.allocator.stat_rebalance_threshold",
"minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull",
0.20,
)
func statsBasedRebalancingEnabled(st *cluster.Settings) bool {
return EnableStatsBasedRebalancing.Get(&st.SV) && st.Version.IsActive(cluster.VersionStatsBasedRebalancing)
}
type balanceDimensions struct {
ranges rangeCountStatus
bytes float64
writes float64
}
func (bd *balanceDimensions) totalScore() float64 {
return float64(bd.ranges) + bd.bytes + bd.writes
}
func (bd balanceDimensions) String() string {
return fmt.Sprintf("%.2f(ranges=%d, bytes=%.2f, writes=%.2f)",
bd.totalScore(), int(bd.ranges), bd.bytes, bd.writes)
}
// candidate store for allocation.
type candidate struct {
store roachpb.StoreDescriptor
valid bool
constraintScore float64
convergesScore int
balanceScore balanceDimensions
rangeCount int
details string
}
func (c candidate) String() string {
return fmt.Sprintf("s%d, valid:%t, constraint:%.2f, converges:%d, balance:%s, rangeCount:%d, "+
"logicalBytes:%s, writesPerSecond:%.2f, details:(%s)",
c.store.StoreID, c.valid, c.constraintScore, c.convergesScore, c.balanceScore, c.rangeCount,
humanizeutil.IBytes(c.store.Capacity.LogicalBytes), c.store.Capacity.WritesPerSecond, c.details)
}
// less returns true if o is a better fit for some range than c is.
func (c candidate) less(o candidate) bool {
if !o.valid {
return false
}
if !c.valid {
return true
}
if c.constraintScore != o.constraintScore {
return c.constraintScore < o.constraintScore
}
if c.convergesScore != o.convergesScore {
return c.convergesScore < o.convergesScore
}
if c.balanceScore.totalScore() != o.balanceScore.totalScore() {
return c.balanceScore.totalScore() < o.balanceScore.totalScore()
}
return c.rangeCount > o.rangeCount
}
type candidateList []candidate
func (cl candidateList) String() string {
if len(cl) == 0 {
return "[]"
}
var buffer bytes.Buffer
buffer.WriteRune('[')
for _, c := range cl {
buffer.WriteRune('\n')
buffer.WriteString(c.String())
}
buffer.WriteRune(']')
return buffer.String()
}
// byScore implements sort.Interface to sort by scores.
type byScore candidateList
var _ sort.Interface = byScore(nil)
func (c byScore) Len() int { return len(c) }
func (c byScore) Less(i, j int) bool { return c[i].less(c[j]) }
func (c byScore) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
// byScoreAndID implements sort.Interface to sort by scores and ids.
type byScoreAndID candidateList
var _ sort.Interface = byScoreAndID(nil)
func (c byScoreAndID) Len() int { return len(c) }
func (c byScoreAndID) Less(i, j int) bool {
if c[i].constraintScore == c[j].constraintScore &&
c[i].convergesScore == c[j].convergesScore &&
c[i].balanceScore.totalScore() == c[j].balanceScore.totalScore() &&
c[i].rangeCount == c[j].rangeCount &&
c[i].valid == c[j].valid {
return c[i].store.StoreID < c[j].store.StoreID
}
return c[i].less(c[j])
}
func (c byScoreAndID) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
// onlyValid returns all the elements in a sorted (by score reversed) candidate
// list that are valid.
func (cl candidateList) onlyValid() candidateList {
for i := len(cl) - 1; i >= 0; i-- {
if cl[i].valid {
return cl[:i+1]
}
}
return candidateList{}
}
// best returns all the elements in a sorted (by score reversed) candidate list
// that share the highest constraint score and are valid.
func (cl candidateList) best() candidateList {
cl = cl.onlyValid()
if len(cl) <= 1 {
return cl
}
for i := 1; i < len(cl); i++ {
if cl[i].constraintScore < cl[0].constraintScore ||
(cl[i].constraintScore == cl[len(cl)-1].constraintScore &&
cl[i].convergesScore < cl[len(cl)-1].convergesScore) {
return cl[:i]
}
}
return cl
}
// worst returns all the elements in a sorted (by score reversed) candidate
// list that share the lowest constraint score.
func (cl candidateList) worst() candidateList {
if len(cl) <= 1 {
return cl
}
// Are there invalid candidates? If so, pick those.
if !cl[len(cl)-1].valid {
for i := len(cl) - 2; i >= 0; i-- {
if cl[i].valid {
return cl[i+1:]
}
}
}
// Find the worst constraint values.
for i := len(cl) - 2; i >= 0; i-- {
if cl[i].constraintScore > cl[len(cl)-1].constraintScore ||
(cl[i].constraintScore == cl[len(cl)-1].constraintScore &&
cl[i].convergesScore > cl[len(cl)-1].convergesScore) {
return cl[i+1:]
}
}
return cl
}
// betterThan returns all elements from a sorted (by score reversed) candidate
// list that have a higher score than the candidate
func (cl candidateList) betterThan(c candidate) candidateList {
for i := 0; i < len(cl); i++ {
if !c.less(cl[i]) {
return cl[:i]
}
}
return cl
}
// selectGood randomly chooses a good candidate store from a sorted (by score
// reversed) candidate list using the provided random generator.
func (cl candidateList) selectGood(randGen allocatorRand) *candidate {
if len(cl) == 0 {
return nil
}
cl = cl.best()
if len(cl) == 1 {
return &cl[0]
}
randGen.Lock()
order := randGen.Perm(len(cl))
randGen.Unlock()
best := &cl[order[0]]
for i := 1; i < allocatorRandomCount; i++ {
if best.less(cl[order[i]]) {
best = &cl[order[i]]
}
}
return best
}
// selectBad randomly chooses a bad candidate store from a sorted (by score
// reversed) candidate list using the provided random generator.
func (cl candidateList) selectBad(randGen allocatorRand) *candidate {
if len(cl) == 0 {
return nil
}
cl = cl.worst()
if len(cl) == 1 {
return &cl[0]
}
randGen.Lock()
order := randGen.Perm(len(cl))
randGen.Unlock()
worst := &cl[order[0]]
for i := 1; i < allocatorRandomCount; i++ {
if cl[order[i]].less(*worst) {
worst = &cl[order[i]]
}
}
return worst
}
// allocateCandidates creates a candidate list of all stores that can be used
// for allocating a new replica ordered from the best to the worst. Only
// stores that meet the criteria are included in the list.
func allocateCandidates(
st *cluster.Settings,
sl StoreList,
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
deterministic bool,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if !preexistingReplicaCheck(s.Node.NodeID, existing) {
continue
}
constraintsOk, preferredMatched := constraintCheck(s, constraints)
if !constraintsOk {
continue
}
if !maxCapacityCheck(s) {
continue
}
diversityScore := diversityScore(s, existingNodeLocalities)
balanceScore := balanceScore(st, sl, s.Capacity, rangeInfo)
candidates = append(candidates, candidate{
store: s,
valid: true,
constraintScore: diversityScore + float64(preferredMatched),
balanceScore: balanceScore,
rangeCount: int(s.Capacity.RangeCount),
details: fmt.Sprintf("diversity=%.2f, preferred=%d", diversityScore, preferredMatched),
})
}
if deterministic {
sort.Sort(sort.Reverse(byScoreAndID(candidates)))
} else {
sort.Sort(sort.Reverse(byScore(candidates)))
}
return candidates
}
// removeCandidates creates a candidate list of all existing replicas' stores
// ordered from least qualified for removal to most qualified. Stores that are
// marked as not valid, are in violation of a required criteria.
func removeCandidates(
st *cluster.Settings,
sl StoreList,
constraints config.Constraints,
rangeInfo RangeInfo,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
deterministic bool,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
constraintsOk, preferredMatched := constraintCheck(s, constraints)
if !constraintsOk {
candidates = append(candidates, candidate{
store: s,
valid: false,
details: "constraint check fail",
})
continue
}
if !maxCapacityCheck(s) {
candidates = append(candidates, candidate{
store: s,
valid: false,
details: "max capacity check fail",
})
continue
}
diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities)
balanceScore := balanceScore(st, sl, s.Capacity, rangeInfo)
var convergesScore int
if !rebalanceFromConvergesOnMean(st, sl, s.Capacity, rangeInfo) {
// If removing this candidate replica does not converge the store
// stats to their means, we make it less attractive for removal by
// adding 1 to the constraint score. Note that when selecting a
// candidate for removal the candidates with the lowest scores are
// more likely to be removed.
convergesScore = 1
}
candidates = append(candidates, candidate{
store: s,
valid: true,
constraintScore: diversityScore + float64(preferredMatched),
convergesScore: convergesScore,
balanceScore: balanceScore,
rangeCount: int(s.Capacity.RangeCount),
details: fmt.Sprintf("diversity=%.2f, preferred=%d", diversityScore, preferredMatched),
})
}
if deterministic {
sort.Sort(sort.Reverse(byScoreAndID(candidates)))
} else {
sort.Sort(sort.Reverse(byScore(candidates)))
}
return candidates
}
// rebalanceCandidates creates two candidate list. The first contains all
// existing replica's stores, order from least qualified for rebalancing to
// most qualified. The second list is of all potential stores that could be
// used as rebalancing receivers, ordered from best to worst.
func rebalanceCandidates(
ctx context.Context,
st *cluster.Settings,
sl StoreList,
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
deterministic bool,
) (candidateList, candidateList) {
// Load the exiting storesIDs into a map to eliminate having to loop
// through the existing descriptors more than once.
existingStoreIDs := make(map[roachpb.StoreID]struct{})
for _, repl := range existing {
existingStoreIDs[repl.StoreID] = struct{}{}
}
// Go through all the stores and find all that match the constraints so that
// we can have accurate stats for rebalance calculations.
var constraintsOkStoreDescriptors []roachpb.StoreDescriptor
type constraintInfo struct {
ok bool
matched int
}
storeInfos := make(map[roachpb.StoreID]constraintInfo)
var rebalanceConstraintsCheck bool
for _, s := range sl.stores {
constraintsOk, preferredMatched := constraintCheck(s, constraints)
storeInfos[s.StoreID] = constraintInfo{ok: constraintsOk, matched: preferredMatched}
_, exists := existingStoreIDs[s.StoreID]
if constraintsOk {
constraintsOkStoreDescriptors = append(constraintsOkStoreDescriptors, s)
} else if exists {
rebalanceConstraintsCheck = true
log.VEventf(ctx, 2, "must rebalance from s%d due to constraint check", s.StoreID)
}
}
constraintsOkStoreList := makeStoreList(constraintsOkStoreDescriptors)
var shouldRebalanceCheck bool
if !rebalanceConstraintsCheck {
for _, store := range sl.stores {
if _, ok := existingStoreIDs[store.StoreID]; ok {
if shouldRebalance(ctx, st, store, constraintsOkStoreList, rangeInfo) {
shouldRebalanceCheck = true
break
}
}
}
}
// Only rebalance away if the constraints don't match or shouldRebalance
// indicated that we should consider moving the range away from one of its
// existing stores.
if !rebalanceConstraintsCheck && !shouldRebalanceCheck {
return nil, nil
}
var existingCandidates candidateList
var candidates candidateList
for _, s := range sl.stores {
storeInfo := storeInfos[s.StoreID]
maxCapacityOK := maxCapacityCheck(s)
if _, ok := existingStoreIDs[s.StoreID]; ok {
if !storeInfo.ok {
existingCandidates = append(existingCandidates, candidate{
store: s,
valid: false,
details: "constraint check fail",
})
continue
}
if !maxCapacityOK {
existingCandidates = append(existingCandidates, candidate{
store: s,
valid: false,
details: "max capacity check fail",
})
continue
}
diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities)
balanceScore := balanceScore(st, sl, s.Capacity, rangeInfo)
var convergesScore int
if !rebalanceFromConvergesOnMean(st, sl, s.Capacity, rangeInfo) {
// Similarly to in removeCandidates, any replica whose removal
// would not converge the range stats to their means is given a
// constraint score boost of 1 to make it less attractive for
// removal.
convergesScore = 1
}
existingCandidates = append(existingCandidates, candidate{
store: s,
valid: true,
constraintScore: diversityScore + float64(storeInfo.matched),
convergesScore: convergesScore,
balanceScore: balanceScore,
rangeCount: int(s.Capacity.RangeCount),
details: fmt.Sprintf("diversity=%.2f, preferred=%d", diversityScore, storeInfo.matched),
})
} else {
if !storeInfo.ok || !maxCapacityOK {
continue
}
balanceScore := balanceScore(st, sl, s.Capacity, rangeInfo)
var convergesScore int
if rebalanceToConvergesOnMean(st, sl, s.Capacity, rangeInfo) {
// This is the counterpart of !rebalanceFromConvergesOnMean from
// the existing candidates. Candidates whose addition would
// converge towards the range count mean are promoted.
convergesScore = 1
} else if !rebalanceConstraintsCheck {
// Only consider this candidate if we must rebalance due to a
// constraint check requirements.
log.VEventf(ctx, 3, "not considering %+v as a candidate for range %+v: score=%s storeList=%+v",
s, rangeInfo, balanceScore, sl)
continue
}
diversityScore := rebalanceToDiversityScore(s, existingNodeLocalities)
candidates = append(candidates, candidate{
store: s,
valid: true,
constraintScore: diversityScore + float64(storeInfo.matched),
convergesScore: convergesScore,
balanceScore: balanceScore,
rangeCount: int(s.Capacity.RangeCount),
details: fmt.Sprintf("diversity=%.2f, preferred=%d", diversityScore, storeInfo.matched),
})
}
}
if deterministic {
sort.Sort(sort.Reverse(byScoreAndID(existingCandidates)))
sort.Sort(sort.Reverse(byScoreAndID(candidates)))
} else {
sort.Sort(sort.Reverse(byScore(existingCandidates)))
sort.Sort(sort.Reverse(byScore(candidates)))
}
return existingCandidates, candidates
}
// shouldRebalance returns whether the specified store is a candidate for
// having a replica removed from it given the candidate store list.
func shouldRebalance(
ctx context.Context,
st *cluster.Settings,
store roachpb.StoreDescriptor,
sl StoreList,
rangeInfo RangeInfo,
) bool {
if store.Capacity.FractionUsed() >= maxFractionUsedThreshold {
log.VEventf(ctx, 2, "s%d: should-rebalance(disk-full): fraction-used=%.2f, capacity=(%v)",
store.StoreID, store.Capacity.FractionUsed(), store.Capacity)
return true
}
if !statsBasedRebalancingEnabled(st) {
return shouldRebalanceNoStats(ctx, st, store, sl)
}
// Rebalance if this store is full enough that the range is a bad fit.
score := balanceScore(st, sl, store.Capacity, rangeInfo)
if rangeIsBadFit(score) {
log.VEventf(ctx, 2,
"s%d: should-rebalance(bad-fit): balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+
"(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ",
store.StoreID, score, store.Capacity, rangeInfo,
sl.candidateRanges.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)),
sl.candidateWritesPerSecond.mean)
return true
}
// Rebalance if there exists another store that is very in need of the
// range and this store is a somewhat bad match for it.
if rangeIsPoorFit(score) {
for _, desc := range sl.stores {
otherScore := balanceScore(st, sl, desc.Capacity, rangeInfo)
if !rangeIsGoodFit(otherScore) {
log.VEventf(ctx, 5,
"s%d is not a good enough fit to replace s%d: balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+
"(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ",
desc.StoreID, store.StoreID, otherScore, desc.Capacity, rangeInfo,
sl.candidateRanges.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)),
sl.candidateWritesPerSecond.mean)
continue
}
if !preexistingReplicaCheck(desc.Node.NodeID, rangeInfo.Desc.Replicas) {
continue
}
log.VEventf(ctx, 2,
"s%d: should-rebalance(better-fit=s%d): balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+
"otherScore=%s, otherCapacity=(%v), "+
"(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ",
store.StoreID, desc.StoreID, score, store.Capacity, rangeInfo,
otherScore, desc.Capacity, sl.candidateRanges.mean,
humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateWritesPerSecond.mean)
return true
}
}
// If we reached this point, we're happy with the range where it is.
log.VEventf(ctx, 3,
"s%d: should-not-rebalance: balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+
"(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ",
store.StoreID, score, store.Capacity, rangeInfo, sl.candidateRanges.mean,
humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateWritesPerSecond.mean)
return false
}
// shouldRebalance implements the decision of whether to rebalance for the case
// when stats-based rebalancing is disabled and decisions should thus be
// made based only on range counts.
func shouldRebalanceNoStats(
ctx context.Context, st *cluster.Settings, store roachpb.StoreDescriptor, sl StoreList,
) bool {
overfullThreshold := int32(math.Ceil(overfullRangeThreshold(st, sl.candidateRanges.mean)))
if store.Capacity.RangeCount > overfullThreshold {
log.VEventf(ctx, 2,
"s%d: should-rebalance(ranges-overfull): rangeCount=%d, mean=%.2f, overfull-threshold=%d",
store.StoreID, store.Capacity.RangeCount, sl.candidateRanges.mean, overfullThreshold)
return true
}
if float64(store.Capacity.RangeCount) > sl.candidateRanges.mean {
underfullThreshold := int32(math.Floor(underfullRangeThreshold(st, sl.candidateRanges.mean)))
for _, desc := range sl.stores {
if desc.Capacity.RangeCount < underfullThreshold {
log.VEventf(ctx, 2,
"s%d: should-rebalance(better-fit-ranges=s%d): rangeCount=%d, otherRangeCount=%d, "+
"mean=%.2f, underfull-threshold=%d",
store.StoreID, desc.StoreID, store.Capacity.RangeCount, desc.Capacity.RangeCount,
sl.candidateRanges.mean, underfullThreshold)
return true
}
}
}
// If we reached this point, we're happy with the range where it is.
return false
}
// preexistingReplicaCheck returns true if no existing replica is present on
// the candidate's node.
func preexistingReplicaCheck(nodeID roachpb.NodeID, existing []roachpb.ReplicaDescriptor) bool {
for _, r := range existing {
if r.NodeID == nodeID {
return false
}
}
return true
}
// storeHasConstraint returns whether a store's attributes or node's locality
// matches the key value pair in the constraint.
func storeHasConstraint(store roachpb.StoreDescriptor, c config.Constraint) bool {
if c.Key == "" {
for _, attrs := range []roachpb.Attributes{store.Attrs, store.Node.Attrs} {
for _, attr := range attrs.Attrs {
if attr == c.Value {
return true
}
}
}
} else {
for _, tier := range store.Node.Locality.Tiers {
if c.Key == tier.Key && c.Value == tier.Value {
return true
}
}
}
return false
}
// constraintCheck returns true iff all required and prohibited constraints are
// satisfied. Stores with attributes or localities that match the most positive
// constraints return higher scores.
func constraintCheck(store roachpb.StoreDescriptor, constraints config.Constraints) (bool, int) {
if len(constraints.Constraints) == 0 {
return true, 0
}
positive := 0
for _, constraint := range constraints.Constraints {
hasConstraint := storeHasConstraint(store, constraint)
switch {
case constraint.Type == config.Constraint_REQUIRED && !hasConstraint:
return false, 0
case constraint.Type == config.Constraint_PROHIBITED && hasConstraint:
return false, 0
case (constraint.Type == config.Constraint_POSITIVE && hasConstraint):
positive++
}
}
return true, positive
}
// diversityScore returns a score between 1 and 0 where higher scores are stores
// with the fewest locality tiers in common with already existing replicas.
func diversityScore(
store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
) float64 {
minScore := roachpb.MaxDiversityScore
for _, locality := range existingNodeLocalities {
if newScore := store.Node.Locality.DiversityScore(locality); newScore < minScore {
minScore = newScore
}
}
return minScore
}
// diversityRemovalScore is the same as diversityScore, but for a node that's
// already present in existingNodeLocalities. It works by calculating the
// diversityScore for nodeID as if nodeID didn't already have a replica.
// As with diversityScore, a higher score indicates that the node is a better
// fit for the range (i.e. keeping it around is good for diversity).
func diversityRemovalScore(
nodeID roachpb.NodeID, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
) float64 {
minScore := roachpb.MaxDiversityScore
locality := existingNodeLocalities[nodeID]
for otherNodeID, otherLocality := range existingNodeLocalities {
if otherNodeID == nodeID {
continue
}
if newScore := otherLocality.DiversityScore(locality); newScore < minScore {
minScore = newScore
}
}
return minScore
}
// rebalanceToDiversityScore is like diversityScore, but it returns what
// the diversity score would be if the given store was added and one of the
// existing stores was removed. This is equivalent to the second lowest score.
//
// This is useful for considering rebalancing a range that already has enough
// replicas - it's perfectly fine to add a replica in the same locality as an
// existing replica in such cases.
func rebalanceToDiversityScore(
store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
) float64 {
minScore := roachpb.MaxDiversityScore
nextMinScore := roachpb.MaxDiversityScore
for _, locality := range existingNodeLocalities {
newScore := store.Node.Locality.DiversityScore(locality)
if newScore < minScore {
nextMinScore = minScore
minScore = newScore
} else if newScore < nextMinScore {
nextMinScore = newScore
}
}
return nextMinScore
}
type rangeCountStatus int
const (
overfull rangeCountStatus = -1
balanced rangeCountStatus = 0
underfull rangeCountStatus = 1
)
func oppositeStatus(rcs rangeCountStatus) rangeCountStatus {
return -rcs
}
// balanceScore returns an arbitrarily scaled score where higher scores are for
// stores where the range is a better fit based on various balance factors
// like range count, disk usage, and QPS.
func balanceScore(
st *cluster.Settings, sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo,
) balanceDimensions {
var dimensions balanceDimensions
if float64(sc.RangeCount) > overfullRangeThreshold(st, sl.candidateRanges.mean) {
dimensions.ranges = overfull
} else if float64(sc.RangeCount) < underfullRangeThreshold(st, sl.candidateRanges.mean) {
dimensions.ranges = underfull
} else {
dimensions.ranges = balanced
}
if statsBasedRebalancingEnabled(st) {
dimensions.bytes = balanceContribution(
st,
dimensions.ranges,
sl.candidateLogicalBytes.mean,
float64(sc.LogicalBytes),
sc.BytesPerReplica,
float64(rangeInfo.LogicalBytes))
dimensions.writes = balanceContribution(
st,
dimensions.ranges,
sl.candidateWritesPerSecond.mean,
sc.WritesPerSecond,
sc.WritesPerReplica,
rangeInfo.WritesPerSecond)
}
return dimensions
}
// balanceContribution generates a single dimension's contribution to a range's
// balanceScore, where larger values mean a store is a better fit for a given
// range.
func balanceContribution(
st *cluster.Settings,
rcs rangeCountStatus,
mean float64,
storeVal float64,
percentiles roachpb.Percentiles,
rangeVal float64,
) float64 {
if storeVal > overfullStatThreshold(st, mean) {
return percentileScore(rcs, percentiles, rangeVal)
} else if storeVal < underfullStatThreshold(st, mean) {
// To ensure that we behave symmetrically when underfull compared to
// when we're overfull, inverse both the rangeCountStatus and the
// result returned by percentileScore. This makes it so that being
// overfull on ranges and on the given dimension behaves symmetrically to
// being underfull on ranges and the given dimension (and ditto for
// overfull on ranges and underfull on a dimension, etc.).
return -percentileScore(oppositeStatus(rcs), percentiles, rangeVal)
}
return 0
}
// percentileScore returns a score for how desirable it is to put a range
// onto a particular store given the assumption that the store is overfull
// along a particular dimension. Takes as parameters:
// * How the number of ranges on the store compares to the norm
// * The distribution of values in the store for the dimension
// * The range's value for the dimension
// A higher score means that the range is a better fit for the store.
func percentileScore(
rcs rangeCountStatus, percentiles roachpb.Percentiles, rangeVal float64,
) float64 {
// Note that there is not any great research behind these values. If they're
// causing thrashing or a bad imbalance, rethink them and modify them as
// appropriate.
if rcs == balanced {
// If the range count is balanced, we should prefer ranges that are
// very small on this particular dimension to try to rebalance this dimension
// without messing up the replica counts.
if rangeVal < percentiles.P10 {
return 1
} else if rangeVal < percentiles.P25 {
return 0.5
} else if rangeVal > percentiles.P90 {
return -1
} else if rangeVal > percentiles.P75 {
return -0.5
}
// else rangeVal >= percentiles.P25 && rangeVal <= percentiles.P75
// It may be better to return more than 0 here, since taking on an
// average range isn't necessarily bad, but for now let's see how this works.
return 0
} else if rcs == overfull {
// If this store has too many ranges, we're ok with moving any range that's
// at least somewhat sizable in this dimension, since we want to reduce both
// the range count and this metric. Moving extreme outliers may be less
// desirable, though, so favor very heavy ranges slightly less and disfavor
// very light ranges.
//
// Note that we can't truly disfavor large ranges, since that prevents us
// from rebalancing nonempty ranges to empty stores (since all nonempty
// ranges will be greater than an empty store's P90).
if rangeVal > percentiles.P90 {
return -0.5
} else if rangeVal >= percentiles.P25 {
return -1
} else if rangeVal >= percentiles.P10 {
return 0
}
// else rangeVal < percentiles.P10
return 0.5
} else if rcs == underfull {
// If this store has too few ranges but is overloaded on some other
// dimension, we need to prioritize moving away replicas that are
// high in that dimension and accepting replicas that are low in it.
if rangeVal < percentiles.P10 {
return 1
} else if rangeVal < percentiles.P25 {
return 0.5
} else if rangeVal > percentiles.P90 {
return -1
} else if rangeVal > percentiles.P75 {
return -0.5
}
// else rangeVal >= percentiles.P25 && rangeVal <= percentiles.P75
return 0
}
panic(fmt.Sprintf("reached unreachable code: %+v; %+v; %+v", rcs, percentiles, rangeVal))
}
func rangeIsGoodFit(bd balanceDimensions) bool {
// A score greater than 1 means that more than one dimension improves
// without being canceled out by the third, since each dimension can only
// contribute a value from [-1,1] to the score.
return bd.totalScore() > 1
}
func rangeIsBadFit(bd balanceDimensions) bool {
// This is the same logic as for rangeIsGoodFit, just reversed.
return bd.totalScore() < -1
}
func rangeIsPoorFit(bd balanceDimensions) bool {
// A score less than -0.5 isn't a great fit for a range, since the
// bad dimensions outweigh the good by at least one entire dimension.
return bd.totalScore() < -0.5
}
func overfullRangeThreshold(st *cluster.Settings, mean float64) float64 {
if !statsBasedRebalancingEnabled(st) {
return mean * (1 + rangeRebalanceThreshold.Get(&st.SV))
}
return math.Max(mean*(1+rangeRebalanceThreshold.Get(&st.SV)), mean+5)
}
func underfullRangeThreshold(st *cluster.Settings, mean float64) float64 {
if !statsBasedRebalancingEnabled(st) {
return mean * (1 - rangeRebalanceThreshold.Get(&st.SV))
}
return math.Min(mean*(1-rangeRebalanceThreshold.Get(&st.SV)), mean-5)
}
func overfullStatThreshold(st *cluster.Settings, mean float64) float64 {
return mean * (1 + statRebalanceThreshold.Get(&st.SV))
}
func underfullStatThreshold(st *cluster.Settings, mean float64) float64 {
return mean * (1 - statRebalanceThreshold.Get(&st.SV))
}
func rebalanceFromConvergesOnMean(
st *cluster.Settings, sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo,
) bool {
return rebalanceConvergesOnMean(
st,
sl,
sc,
sc.RangeCount-1,
sc.LogicalBytes-rangeInfo.LogicalBytes,
sc.WritesPerSecond-rangeInfo.WritesPerSecond)
}
func rebalanceToConvergesOnMean(
st *cluster.Settings, sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo,
) bool {
return rebalanceConvergesOnMean(
st,
sl,
sc,
sc.RangeCount+1,
sc.LogicalBytes+rangeInfo.LogicalBytes,
sc.WritesPerSecond+rangeInfo.WritesPerSecond)
}
func rebalanceConvergesOnMean(
st *cluster.Settings,
sl StoreList,
sc roachpb.StoreCapacity,
newRangeCount int32,
newLogicalBytes int64,
newWritesPerSecond float64,
) bool {
if !statsBasedRebalancingEnabled(st) {
return convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean)
}
// Note that we check both converges and diverges. If we always decremented
// convergeCount when something didn't converge, ranges with stats equal to 0
// would almost never converge (and thus almost never get rebalanced).
var convergeCount int
if convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) {
convergeCount++
} else if divergesFromMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) {
convergeCount--
}
if convergesOnMean(float64(sc.LogicalBytes), float64(newLogicalBytes), sl.candidateLogicalBytes.mean) {
convergeCount++
} else if divergesFromMean(float64(sc.LogicalBytes), float64(newLogicalBytes), sl.candidateLogicalBytes.mean) {
convergeCount--
}
if convergesOnMean(sc.WritesPerSecond, newWritesPerSecond, sl.candidateWritesPerSecond.mean) {
convergeCount++
} else if divergesFromMean(sc.WritesPerSecond, newWritesPerSecond, sl.candidateWritesPerSecond.mean) {
convergeCount--
}
return convergeCount > 0
}
func convergesOnMean(oldVal, newVal, mean float64) bool {
return math.Abs(newVal-mean) < math.Abs(oldVal-mean)
}
func divergesFromMean(oldVal, newVal, mean float64) bool {
return math.Abs(newVal-mean) > math.Abs(oldVal-mean)
}
// maxCapacityCheck returns true if the store has room for a new replica.
func maxCapacityCheck(store roachpb.StoreDescriptor) bool {
return store.Capacity.FractionUsed() < maxFractionUsedThreshold
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.1.3

搜索帮助

344bd9b3 5694891 D2dac590 5694891