2 Star 2 Fork 1

cockroachdb / cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
node_liveness.go 23.86 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"fmt"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
var (
// ErrNoLivenessRecord is returned when asking for liveness information
// about a node for which nothing is known.
ErrNoLivenessRecord = errors.New("node not in the liveness table")
errChangeDecommissioningFailed = errors.New("failed to change the decommissioning status")
// errSkippedHeartbeat is returned when a heartbeat request fails because
// the underlying liveness record has had its epoch incremented.
errSkippedHeartbeat = errors.New("heartbeat failed on epoch increment")
)
type errRetryLiveness struct {
error
}
func (e *errRetryLiveness) Cause() error {
return e.error
}
func (e *errRetryLiveness) Error() string {
return fmt.Sprintf("%T: %s", *e, e.error)
}
// Node liveness metrics counter names.
var (
metaLiveNodes = metric.Metadata{
Name: "liveness.livenodes",
Help: "Number of live nodes in the cluster (will be 0 if this node is not itself live)"}
metaHeartbeatSuccesses = metric.Metadata{
Name: "liveness.heartbeatsuccesses",
Help: "Number of successful node liveness heartbeats from this node"}
metaHeartbeatFailures = metric.Metadata{
Name: "liveness.heartbeatfailures",
Help: "Number of failed node liveness heartbeats from this node"}
metaEpochIncrements = metric.Metadata{
Name: "liveness.epochincrements",
Help: "Number of times this node has incremented its liveness epoch"}
)
// IsLive returns whether the node is considered live at the given time with the
// given clock offset.
func (l *Liveness) IsLive(now hlc.Timestamp, maxOffset time.Duration) bool {
if maxOffset == timeutil.ClocklessMaxOffset {
// When using clockless reads, we're live without a buffer period.
maxOffset = 0
}
expiration := l.Expiration.Add(-maxOffset.Nanoseconds(), 0)
return now.Less(expiration)
}
// LivenessMetrics holds metrics for use with node liveness activity.
type LivenessMetrics struct {
LiveNodes *metric.Gauge
HeartbeatSuccesses *metric.Counter
HeartbeatFailures *metric.Counter
EpochIncrements *metric.Counter
}
// IsLiveCallback is invoked when a node's IsLive state changes to true.
// Callbacks can be registered via NodeLiveness.RegisterCallback().
type IsLiveCallback func(nodeID roachpb.NodeID)
// HeartbeatCallback is invoked whenever this node updates its own liveness status,
// indicating that it is alive.
type HeartbeatCallback func(context.Context)
// NodeLiveness encapsulates information on node liveness and provides
// an API for querying, updating, and invalidating node
// liveness. Nodes periodically "heartbeat" the range holding the node
// liveness system table to indicate that they're available. The
// resulting liveness information is used to ignore unresponsive nodes
// while making range quiescense decisions, as well as for efficient,
// node liveness epoch-based range leases.
type NodeLiveness struct {
ambientCtx log.AmbientContext
clock *hlc.Clock
db *client.DB
gossip *gossip.Gossip
livenessThreshold time.Duration
heartbeatInterval time.Duration
pauseHeartbeat atomic.Value // contains a bool
selfSem chan struct{}
otherSem chan struct{}
triggerHeartbeat chan struct{} // for testing
metrics LivenessMetrics
mu struct {
syncutil.Mutex
self Liveness
callbacks []IsLiveCallback
nodes map[roachpb.NodeID]Liveness
heartbeatCallback HeartbeatCallback
}
}
// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(
ambient log.AmbientContext,
clock *hlc.Clock,
db *client.DB,
g *gossip.Gossip,
livenessThreshold time.Duration,
renewalDuration time.Duration,
) *NodeLiveness {
nl := &NodeLiveness{
ambientCtx: ambient,
clock: clock,
db: db,
gossip: g,
livenessThreshold: livenessThreshold,
heartbeatInterval: livenessThreshold - renewalDuration,
selfSem: make(chan struct{}, 1),
otherSem: make(chan struct{}, 1),
triggerHeartbeat: make(chan struct{}, 1),
}
nl.metrics = LivenessMetrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses),
HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures),
EpochIncrements: metric.NewCounter(metaEpochIncrements),
}
nl.pauseHeartbeat.Store(false)
nl.mu.nodes = map[roachpb.NodeID]Liveness{}
livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix)
nl.gossip.RegisterCallback(livenessRegex, nl.livenessGossipUpdate)
return nl
}
var errNodeDrainingSet = errors.New("node is already draining")
func (nl *NodeLiveness) sem(nodeID roachpb.NodeID) chan struct{} {
if nodeID == nl.gossip.NodeID.Get() {
return nl.selfSem
}
return nl.otherSem
}
// SetDraining calls PauseHeartbeat with the given boolean and then attempts to
// update the liveness record.
func (nl *NodeLiveness) SetDraining(ctx context.Context, drain bool) {
ctx = nl.ambientCtx.AnnotateCtx(ctx)
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
liveness, err := nl.Self()
if err != nil && err != ErrNoLivenessRecord {
log.Errorf(ctx, "unexpected error getting liveness: %s", err)
}
if err := nl.setDrainingInternal(ctx, liveness, drain); err == nil {
return
}
}
}
// SetDecommissioning runs a best-effort attempt of marking the the liveness
// record as decommissioning.
func (nl *NodeLiveness) SetDecommissioning(
ctx context.Context, nodeID roachpb.NodeID, decommission bool,
) error {
ctx = nl.ambientCtx.AnnotateCtx(ctx)
for {
liveness, err := nl.GetLiveness(nodeID) // need new liveness in each iteration
if err != nil {
return errors.Wrap(err, "unable to get liveness")
}
if err := nl.setDecommissioningInternal(ctx, nodeID, liveness, decommission); err != nil {
if errors.Cause(err) == errChangeDecommissioningFailed {
continue // expected when epoch incremented
}
return err
}
return nil
}
}
func (nl *NodeLiveness) setDrainingInternal(
ctx context.Context, liveness *Liveness, drain bool,
) error {
nodeID := nl.gossip.NodeID.Get()
sem := nl.sem(nodeID)
// Allow only one attempt to set the draining field at a time.
select {
case sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
defer func() {
<-sem
}()
newLiveness := Liveness{
NodeID: nodeID,
Epoch: 1,
}
if liveness != nil {
newLiveness = *liveness
}
newLiveness.Draining = drain
if err := nl.updateLiveness(ctx, &newLiveness, liveness, func(actual Liveness) error {
nl.setSelf(actual)
if actual.Draining == newLiveness.Draining {
return errNodeDrainingSet
}
return errors.New("failed to update liveness record")
}); err != nil {
if err == errNodeDrainingSet {
return nil
}
return err
}
nl.setSelf(newLiveness)
return nil
}
func (nl *NodeLiveness) setDecommissioningInternal(
ctx context.Context, nodeID roachpb.NodeID, liveness *Liveness, decommission bool,
) error {
// Allow only one attempt to set the decommissioning field at a time if it is this node.
if nodeID == nl.gossip.NodeID.Get() {
sem := nl.sem(nodeID)
select {
case sem <- struct{}{}:
defer func() {
<-sem
}()
case <-ctx.Done():
return ctx.Err()
}
}
newLiveness := Liveness{
NodeID: nodeID,
Epoch: 1,
}
if liveness != nil {
newLiveness = *liveness
}
newLiveness.Decommissioning = decommission
return nl.updateLiveness(ctx, &newLiveness, liveness, func(actual Liveness) error {
if actual.Decommissioning == newLiveness.Decommissioning {
return nil
}
return errChangeDecommissioningFailed
})
}
// GetLivenessThreshold returns the maximum duration between heartbeats
// before a node is considered not-live.
func (nl *NodeLiveness) GetLivenessThreshold() time.Duration {
return nl.livenessThreshold
}
// IsLive returns whether or not the specified node is considered live
// based on the last receipt of a liveness update via gossip. It is an
// error if the specified node is not in the local liveness table.
func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
liveness, err := nl.GetLiveness(nodeID)
if err != nil {
return false, err
}
return liveness.IsLive(nl.clock.Now(), nl.clock.MaxOffset()), nil
}
// StartHeartbeat starts a periodic heartbeat to refresh this node's
// last heartbeat in the node liveness table. The optionally provided
// HeartbeatCallback will be invoked whenever this node updates its own liveness.
func (nl *NodeLiveness) StartHeartbeat(
ctx context.Context, stopper *stop.Stopper, alive HeartbeatCallback,
) {
log.VEventf(ctx, 1, "starting liveness heartbeat")
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()
nl.mu.Lock()
nl.mu.heartbeatCallback = alive
nl.mu.Unlock()
stopper.RunWorker(ctx, func(context.Context) {
ambient := nl.ambientCtx
ambient.AddLogTag("hb", nil)
ticker := time.NewTicker(nl.heartbeatInterval)
defer ticker.Stop()
incrementEpoch := true
for {
if !nl.pauseHeartbeat.Load().(bool) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), nl.heartbeatInterval/2)
ctx, sp := ambient.AnnotateCtxWithSpan(ctx, "heartbeat")
defer cancel()
defer sp.Finish()
// Retry heartbeat in the event the conditional put fails.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
liveness, err := nl.Self()
if err != nil && err != ErrNoLivenessRecord {
log.Errorf(ctx, "unexpected error getting liveness: %v", err)
}
if err := nl.heartbeatInternal(ctx, liveness, incrementEpoch); err != nil {
if err == errSkippedHeartbeat {
log.Infof(ctx, "%s; retrying", err)
continue
}
log.Warningf(ctx, "failed node liveness heartbeat: %v", err)
} else {
incrementEpoch = false // don't increment epoch after first heartbeat
}
break
}
}()
}
select {
case <-ticker.C:
case <-nl.triggerHeartbeat:
case <-stopper.ShouldStop():
return
}
}
})
}
// PauseHeartbeat stops or restarts the periodic heartbeat depending on the
// pause parameter. When unpausing, triggers an immediate heartbeat.
func (nl *NodeLiveness) PauseHeartbeat(pause bool) {
nl.pauseHeartbeat.Store(pause)
if !pause {
select {
case nl.triggerHeartbeat <- struct{}{}:
default:
}
}
}
// DisableAllHeartbeatsForTest disables all node liveness heartbeats, including
// those triggered from outside the normal StartHeartbeat loop. Returns a
// closure to call to re-enable heartbeats. Only safe for use in tests.
func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() {
nl.PauseHeartbeat(true)
nl.selfSem <- struct{}{}
nl.otherSem <- struct{}{}
return func() {
<-nl.selfSem
<-nl.otherSem
}
}
var errNodeAlreadyLive = errors.New("node already live")
// Heartbeat is called to update a node's expiration timestamp. This
// method does a conditional put on the node liveness record, and if
// successful, stores the updated liveness record in the nodes map.
func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness *Liveness) error {
return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */)
}
func (nl *NodeLiveness) heartbeatInternal(
ctx context.Context, liveness *Liveness, incrementEpoch bool,
) error {
defer func(start time.Time) {
if dur := timeutil.Now().Sub(start); dur > time.Second {
log.Warningf(ctx, "slow heartbeat took %0.1fs", dur.Seconds())
}
}(timeutil.Now())
// Allow only one heartbeat at a time.
nodeID := nl.gossip.NodeID.Get()
sem := nl.sem(nodeID)
select {
case sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
defer func() {
<-sem
}()
newLiveness := Liveness{
NodeID: nodeID,
Epoch: 1,
}
if liveness != nil {
newLiveness = *liveness
if incrementEpoch {
newLiveness.Epoch++
// Clear draining field.
newLiveness.Draining = false
}
}
// We need to add the maximum clock offset to the expiration because it's
// used when determining liveness for a node (unless we're configured for
// clockless reads).
{
maxOffset := nl.clock.MaxOffset()
if maxOffset == timeutil.ClocklessMaxOffset {
maxOffset = 0
}
newLiveness.Expiration = nl.clock.Now().Add(
(nl.livenessThreshold + maxOffset).Nanoseconds(), 0)
}
if err := nl.updateLiveness(ctx, &newLiveness, liveness, func(actual Liveness) error {
// Update liveness to actual value on mismatch.
nl.setSelf(actual)
// If the actual liveness is different than expected, but is
// considered live, treat the heartbeat as a success. This can
// happen when the periodic heartbeater races with a concurrent
// lease acquisition.
if actual.IsLive(nl.clock.Now(), nl.clock.MaxOffset()) && !incrementEpoch {
return errNodeAlreadyLive
}
// Otherwise, return error.
return errSkippedHeartbeat
}); err != nil {
if err == errNodeAlreadyLive {
nl.metrics.HeartbeatSuccesses.Inc(1)
return nil
}
nl.metrics.HeartbeatFailures.Inc(1)
return err
}
log.VEventf(ctx, 1, "heartbeat %+v", newLiveness.Expiration)
nl.setSelf(newLiveness)
nl.metrics.HeartbeatSuccesses.Inc(1)
return nil
}
// Self returns the liveness record for this node. ErrNoLivenessRecord
// is returned in the event that the node has neither heartbeat its
// liveness record successfully, nor received a gossip message containing
// a former liveness update on restart.
func (nl *NodeLiveness) Self() (*Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
return nl.getLivenessLocked(nl.gossip.NodeID.Get())
}
func (nl *NodeLiveness) setSelf(liveness Liveness) {
nl.mu.Lock()
nl.mu.self = liveness
nl.mu.Unlock()
}
// GetIsLiveMap returns a map of nodeID to boolean liveness status of
// each node.
func (nl *NodeLiveness) GetIsLiveMap() map[roachpb.NodeID]bool {
nl.mu.Lock()
defer nl.mu.Unlock()
lMap := map[roachpb.NodeID]bool{}
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()
for nID, l := range nl.mu.nodes {
if nID == nl.mu.self.NodeID {
lMap[nID] = nl.mu.self.IsLive(now, maxOffset)
} else {
lMap[nID] = l.IsLive(now, maxOffset)
}
}
return lMap
}
// GetLivenesses returns a slice containing the liveness status of every node
// on the cluster.
func (nl *NodeLiveness) GetLivenesses() []Liveness {
nl.mu.Lock()
defer nl.mu.Unlock()
livenesses := make([]Liveness, 0, len(nl.mu.nodes))
for _, l := range nl.mu.nodes {
if l.NodeID == nl.mu.self.NodeID {
livenesses = append(livenesses, nl.mu.self)
} else {
livenesses = append(livenesses, l)
}
}
return livenesses
}
// GetLiveness returns the liveness record for the specified nodeID.
// ErrNoLivenessRecord is returned in the event that nothing is yet
// known about nodeID via liveness gossip.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (*Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
return nl.getLivenessLocked(nodeID)
}
func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (*Liveness, error) {
if nodeID != 0 && nodeID == nl.mu.self.NodeID {
copySelf := nl.mu.self
return &copySelf, nil
}
if l, ok := nl.mu.nodes[nodeID]; ok {
return &l, nil
}
return nil, ErrNoLivenessRecord
}
var errEpochAlreadyIncremented = errors.New("epoch already incremented")
// IncrementEpoch is called to increment the current liveness epoch,
// thereby invalidating anything relying on the liveness of the
// previous epoch. This method does a conditional put on the node
// liveness record, and if successful, stores the updated liveness
// record in the nodes map. If this method is called on a node ID
// which is considered live according to the most recent information
// gathered through gossip, an error is returned.
func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness *Liveness) error {
// Allow only one increment at a time.
sem := nl.sem(liveness.NodeID)
select {
case sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
defer func() {
<-sem
}()
if liveness.IsLive(nl.clock.Now(), nl.clock.MaxOffset()) {
return errors.Errorf("cannot increment epoch on live node: %+v", liveness)
}
newLiveness := *liveness
newLiveness.Epoch++
update := func(l Liveness) {
nl.mu.Lock()
defer nl.mu.Unlock()
if nodeID := nl.gossip.NodeID.Get(); nodeID == l.NodeID {
nl.mu.self = l
} else {
nl.mu.nodes[l.NodeID] = l
}
}
if err := nl.updateLiveness(ctx, &newLiveness, liveness, func(actual Liveness) error {
defer update(actual)
if actual.Epoch > liveness.Epoch {
return errEpochAlreadyIncremented
} else if actual.Epoch < liveness.Epoch {
return errors.Errorf("unexpected liveness epoch %d; expected >= %d", actual.Epoch, liveness.Epoch)
}
return errors.Errorf("mismatch incrementing epoch for %+v; actual is %+v", *liveness, actual)
}); err != nil {
if err == errEpochAlreadyIncremented {
return nil
}
return err
}
log.VEventf(ctx, 1, "incremented node %d liveness epoch to %d",
newLiveness.NodeID, newLiveness.Epoch)
update(newLiveness)
nl.metrics.EpochIncrements.Inc(1)
return nil
}
// Metrics returns a struct which contains metrics related to node
// liveness activity.
func (nl *NodeLiveness) Metrics() LivenessMetrics {
return nl.metrics
}
// RegisterCallback registers a callback to be invoked any time a
// node's IsLive() state changes to true.
func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.callbacks = append(nl.mu.callbacks, cb)
}
// updateLiveness does a conditional put on the node liveness record for the
// node specified by nodeID. In the event that the conditional put fails, and
// the handleCondFailed callback is not nil, it's invoked with the actual node
// liveness record and nil is returned for an error. If handleCondFailed is nil,
// any conditional put failure is returned as an error to the caller. The
// conditional put is done as a 1PC transaction with a ModifiedSpanTrigger which
// indicates the node liveness record that the range leader should gossip on
// commit.
//
// updateLiveness terminates certain errors that are expected to occur
// sporadically, such as TransactionStatusError (due to the 1PC requirement of
// the liveness txn, and ambiguous results).
func (nl *NodeLiveness) updateLiveness(
ctx context.Context,
newLiveness *Liveness,
oldLiveness *Liveness,
handleCondFailed func(actual Liveness) error,
) error {
for {
if err := nl.updateLivenessAttempt(ctx, newLiveness, oldLiveness, handleCondFailed); err != nil {
// Intentionally don't errors.Cause() the error, or we'd hop past errRetryLiveness.
if _, ok := err.(*errRetryLiveness); ok {
log.Eventf(ctx, "retrying liveness update after %s", err)
continue
}
return err
}
return nil
}
}
func (nl *NodeLiveness) updateLivenessAttempt(
ctx context.Context,
newLiveness *Liveness,
oldLiveness *Liveness,
handleCondFailed func(actual Liveness) error,
) error {
// First check the existing liveness map to avoid known conditional
// put failures.
if l, err := nl.GetLiveness(newLiveness.NodeID); err == nil &&
(oldLiveness == nil || *l != *oldLiveness) {
return handleCondFailed(*l)
}
if err := nl.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
key := keys.NodeLivenessKey(newLiveness.NodeID)
// The batch interface requires interface{}(nil), not *Liveness(nil).
if oldLiveness == nil {
b.CPut(key, newLiveness, nil)
} else {
b.CPut(key, newLiveness, oldLiveness)
}
// Use a trigger on EndTransaction to indicate that node liveness should
// be re-gossiped. Further, require that this transaction complete as a
// one phase commit to eliminate the possibility of leaving write intents.
b.AddRawRequest(&roachpb.EndTransactionRequest{
Commit: true,
Require1PC: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ModifiedSpanTrigger: &roachpb.ModifiedSpanTrigger{
NodeLivenessSpan: &roachpb.Span{
Key: key,
EndKey: key.Next(),
},
},
},
})
return txn.Run(ctx, b)
}); err != nil {
switch tErr := errors.Cause(err).(type) {
case *roachpb.ConditionFailedError:
if handleCondFailed != nil {
if tErr.ActualValue == nil {
return handleCondFailed(Liveness{})
}
var actualLiveness Liveness
if err := tErr.ActualValue.GetProto(&actualLiveness); err != nil {
return errors.Wrapf(err, "couldn't update node liveness from CPut actual value")
}
return handleCondFailed(actualLiveness)
}
case *roachpb.TransactionStatusError:
return &errRetryLiveness{err}
case *roachpb.AmbiguousResultError:
return &errRetryLiveness{err}
}
return err
}
nl.mu.Lock()
cb := nl.mu.heartbeatCallback
nl.mu.Unlock()
if cb != nil {
cb(ctx)
}
return nil
}
// livenessGossipUpdate is the gossip callback used to keep the
// in-memory liveness info up to date.
func (nl *NodeLiveness) livenessGossipUpdate(key string, content roachpb.Value) {
var liveness Liveness
if err := content.GetProto(&liveness); err != nil {
log.Error(context.TODO(), err)
return
}
// If there's an existing liveness record, only update the received
// timestamp if this is our first receipt of this node's liveness, the
// expiration or epoch was advanced, or the draining state changed.
var callbacks []IsLiveCallback
nl.mu.Lock()
exLiveness, ok := nl.mu.nodes[liveness.NodeID]
if !ok || exLiveness.Expiration.Less(liveness.Expiration) || exLiveness.Epoch < liveness.Epoch || exLiveness.Draining != liveness.Draining {
nl.mu.nodes[liveness.NodeID] = liveness
// If isLive status is now true, but previously false, invoke any registered callbacks.
now, offset := nl.clock.Now(), nl.clock.MaxOffset()
if !exLiveness.IsLive(now, offset) && liveness.IsLive(now, offset) {
callbacks = append(callbacks, nl.mu.callbacks...)
}
}
nl.mu.Unlock()
// Invoke any "is live" callbacks after releasing lock.
for _, cb := range callbacks {
cb(liveness.NodeID)
}
}
// numLiveNodes is used to populate a metric that tracks the number of live
// nodes in the cluster. Returns 0 if this node is not itself live, to avoid
// reporting potentially inaccurate data.
// We export this metric from every live node rather than a single particular
// live node because liveness information is gossiped and thus may be stale.
// That staleness could result in no nodes reporting the metric or multiple
// nodes reporting the metric, so it's simplest to just have all live nodes
// report it.
func (nl *NodeLiveness) numLiveNodes() int64 {
selfID := nl.gossip.NodeID.Get()
if selfID == 0 {
return 0
}
nl.mu.Lock()
defer nl.mu.Unlock()
// If this node isn't live, we don't want to report its view of node liveness
// because it's more likely to be inaccurate than the view of a live node.
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()
if !nl.mu.self.IsLive(now, maxOffset) {
return 0
}
var liveNodes int64
for _, l := range nl.mu.nodes {
if l.IsLive(now, maxOffset) {
liveNodes++
}
}
return liveNodes
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.1.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891