2 Star 2 Fork 1

cockroachdb / cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
lease.go 47.05 KB
一键复制 编辑 原始数据 按行查看 历史
Andrei Matei 提交于 2017-09-29 18:54 . sql: fix stale txn deadline
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package sql
import (
"bytes"
"fmt"
"math/rand"
"sort"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.
var (
// LeaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered in the range
// [0.75,1.25]*LeaseDuration. Exported for testing purposes only.
LeaseDuration = 5 * time.Minute
)
// tableVersionState holds the state for a table version. This includes
// the lease information for a table version.
// TODO(vivek): A node only needs to manage lease information on what it
// thinks is the latest version for a table descriptor.
type tableVersionState struct {
// This descriptor is immutable and can be shared by many goroutines.
// Care must be taken to not modify it.
sqlbase.TableDescriptor
// The expiration time for the table version. A transaction with
// timestamp T can use this table descriptor version iff
// TableDescriptor.ModificationTime <= T < expiration
//
// The expiration time is either the expiration time of the lease
// when a lease is associated with the table version, or the
// ModificationTime of the next version when the table version
// isn't associated with a lease.
expiration hlc.Timestamp
// mu protects refcount and leased.
mu syncutil.Mutex
refcount int
// Set if the node has a lease on this descriptor version.
// Leases can only be held for the two latest versions of
// a table descriptor. The latest version known to a node
// (can be different than the current latest version in the store)
// is always associated with a lease. The previous version known to
// a node might not necessarily be associated with a lease.
leased bool
}
func (s *tableVersionState) String() string {
return fmt.Sprintf("%d(%q) ver=%d:%s, refcount=%d", s.ID, s.Name, s.Version, s.expiration, s.refcount)
}
// hasExpired checks if the table is too old to be used (by a txn operating)
// at the given timestamp
func (s *tableVersionState) hasExpired(timestamp hlc.Timestamp) bool {
return !timestamp.Less(s.expiration)
}
func (s *tableVersionState) incRefcount() {
s.mu.Lock()
s.incRefcountLocked()
s.mu.Unlock()
}
func (s *tableVersionState) incRefcountLocked() {
s.refcount++
log.VEventf(context.TODO(), 2, "tableVersionState.incRef: %s", s)
}
// The lease expiration stored in the database is of a different type.
// We've decided that it's too much work to change the type to
// hlc.Timestamp, so we're using this method to give us the stored
// type: parser.DTimestamp.
func (s *tableVersionState) leaseExpiration() parser.DTimestamp {
return parser.DTimestamp{Time: timeutil.Unix(0, s.expiration.WallTime).Round(time.Microsecond)}
}
// LeaseStore implements the operations for acquiring and releasing leases and
// publishing a new version of a descriptor. Exported only for testing.
type LeaseStore struct {
db client.DB
clock *hlc.Clock
nodeID *base.NodeIDContainer
testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
}
// jitteredLeaseDuration returns a randomly jittered duration from the interval
// [0.75 * leaseDuration, 1.25 * leaseDuration].
func jitteredLeaseDuration() time.Duration {
return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64()))
}
// acquire a lease on the most recent version of a table descriptor.
// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
func (s LeaseStore) acquire(
ctx context.Context, tableID sqlbase.ID, minExpirationTime hlc.Timestamp,
) (*tableVersionState, error) {
var table *tableVersionState
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
expiration := txn.OrigTimestamp()
expiration.WallTime += int64(jitteredLeaseDuration())
if expiration.Less(minExpirationTime) {
expiration = minExpirationTime
}
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
if err != nil {
return err
}
if err := filterTableState(tableDesc); err != nil {
return err
}
tableDesc.MaybeUpgradeFormatVersion()
// Once the descriptor is set it is immutable and care must be taken
// to not modify it.
table = &tableVersionState{
TableDescriptor: *tableDesc,
expiration: expiration,
leased: true,
}
// ValidateTable instead of Validate, even though we have a txn available,
// so we don't block reads waiting for this table version.
if err := table.ValidateTable(); err != nil {
return err
}
nodeID := s.nodeID.Get()
if nodeID == 0 {
panic("zero nodeID")
}
p := makeInternalPlanner("lease-insert", txn, security.RootUser, s.memMetrics)
defer finishInternalPlanner(p)
const insertLease = `INSERT INTO system.lease ("descID", version, "nodeID", expiration) ` +
`VALUES ($1, $2, $3, $4)`
leaseExpiration := table.leaseExpiration()
count, err := p.exec(
ctx, insertLease, table.ID, int(table.Version), nodeID, &leaseExpiration,
)
if err != nil {
return err
}
if count != 1 {
return errors.Errorf("%s: expected 1 result, found %d", insertLease, count)
}
return nil
})
if err == nil && s.testingKnobs.LeaseAcquiredEvent != nil {
s.testingKnobs.LeaseAcquiredEvent(table.TableDescriptor, nil)
}
return table, err
}
// Release a previously acquired table descriptor.
func (s LeaseStore) release(ctx context.Context, stopper *stop.Stopper, table *tableVersionState) {
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = stopper.ShouldQuiesce()
firstAttempt := true
for r := retry.Start(retryOptions); r.Next(); {
// This transaction is idempotent.
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
log.VEventf(ctx, 2, "LeaseStore releasing lease %s", table)
nodeID := s.nodeID.Get()
if nodeID == 0 {
panic("zero nodeID")
}
p := makeInternalPlanner("lease-release", txn, security.RootUser, s.memMetrics)
defer finishInternalPlanner(p)
const deleteLease = `DELETE FROM system.lease ` +
`WHERE ("descID", version, "nodeID", expiration) = ($1, $2, $3, $4)`
leaseExpiration := table.leaseExpiration()
count, err := p.exec(
ctx, deleteLease, table.ID, int(table.Version), nodeID, &leaseExpiration)
if err != nil {
return err
}
// We allow count == 0 after the first attempt.
if count > 1 || (count == 0 && firstAttempt) {
log.Warningf(ctx, "unexpected results while deleting lease %s: "+
"expected 1 result, found %d", table, count)
}
return nil
})
if s.testingKnobs.LeaseReleasedEvent != nil {
s.testingKnobs.LeaseReleasedEvent(table.TableDescriptor, err)
}
if err == nil {
break
}
log.Warningf(ctx, "error releasing lease %q: %s", table, err)
firstAttempt = false
}
}
// WaitForOneVersion returns once there are no unexpired leases on the
// previous version of the table descriptor. It returns the current version.
// After returning there can only be versions of the descriptor >= to the
// returned version. Lease acquisition (see acquire()) maintains the
// invariant that no new leases for desc.Version-1 will be granted once
// desc.Version exists.
func (s LeaseStore) WaitForOneVersion(
ctx context.Context, tableID sqlbase.ID, retryOpts retry.Options,
) (sqlbase.DescriptorVersion, error) {
desc := &sqlbase.Descriptor{}
descKey := sqlbase.MakeDescMetadataKey(tableID)
var tableDesc *sqlbase.TableDescriptor
for r := retry.Start(retryOpts); r.Next(); {
// Get the current version of the table descriptor non-transactionally.
//
// TODO(pmattis): Do an inconsistent read here?
if err := s.db.GetProto(context.TODO(), descKey, desc); err != nil {
return 0, err
}
tableDesc = desc.GetTable()
if tableDesc == nil {
return 0, errors.Errorf("ID %d is not a table", tableID)
}
// Check to see if there are any leases that still exist on the previous
// version of the descriptor.
now := s.clock.Now()
count, err := s.countLeases(ctx, tableDesc.ID, tableDesc.Version-1, now.GoTime())
if err != nil {
return 0, err
}
if count == 0 {
break
}
log.Infof(context.TODO(), "publish (count leases): descID=%d name=%s version=%d count=%d",
tableDesc.ID, tableDesc.Name, tableDesc.Version-1, count)
}
return tableDesc.Version, nil
}
var errDidntUpdateDescriptor = errors.New("didn't update the table descriptor")
// Publish updates a table descriptor. It also maintains the invariant that
// there are at most two versions of the descriptor out in the wild at any time
// by first waiting for all nodes to be on the current (pre-update) version of
// the table desc.
// The update closure is called after the wait, and it provides the new version
// of the descriptor to be written. In a multi-step schema operation, this
// update should perform a single step.
// The closure may be called multiple times if retries occur; make sure it does
// not have side effects.
// Returns the updated version of the descriptor.
func (s LeaseStore) Publish(
ctx context.Context,
tableID sqlbase.ID,
update func(*sqlbase.TableDescriptor) error,
logEvent func(*client.Txn) error,
) (*sqlbase.Descriptor, error) {
errLeaseVersionChanged := errors.New("lease version changed")
// Retry while getting errLeaseVersionChanged.
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
// Wait until there are no unexpired leases on the previous version
// of the table.
expectedVersion, err := s.WaitForOneVersion(ctx, tableID, base.DefaultRetryOptions())
if err != nil {
return nil, err
}
desc := &sqlbase.Descriptor{}
// There should be only one version of the descriptor, but it's
// a race now to update to the next version.
err = s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
descKey := sqlbase.MakeDescMetadataKey(tableID)
// Re-read the current version of the table descriptor, this time
// transactionally.
if err := txn.GetProto(ctx, descKey, desc); err != nil {
return err
}
tableDesc := desc.GetTable()
if tableDesc == nil {
return errors.Errorf("ID %d is not a table", tableID)
}
if expectedVersion != tableDesc.Version {
// The version changed out from under us. Someone else must be
// performing a schema change operation.
if log.V(3) {
log.Infof(ctx, "publish (version changed): %d != %d", expectedVersion, tableDesc.Version)
}
return errLeaseVersionChanged
}
// Run the update closure.
version := tableDesc.Version
if err := update(tableDesc); err != nil {
return err
}
if version != tableDesc.Version {
return errors.Errorf("updated version to: %d, expected: %d",
tableDesc.Version, version)
}
tableDesc.Version++
// We need to set ModificationTime to the transaction's commit
// timestamp. Since this is a SERIALZIABLE transaction, that will
// be OrigTimestamp.
modTime := txn.OrigTimestamp()
tableDesc.ModificationTime = modTime
log.Infof(ctx, "publish: descID=%d (%s) version=%d mtime=%s",
tableDesc.ID, tableDesc.Name, tableDesc.Version, modTime.GoTime())
if err := tableDesc.ValidateTable(); err != nil {
return err
}
// Write the updated descriptor.
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
}
b := txn.NewBatch()
b.Put(descKey, desc)
if logEvent != nil {
// If an event log is required for this update, ensure that the
// descriptor change occurs first in the transaction. This is
// necessary to ensure that the System configuration change is
// gossiped. See the documentation for
// transaction.SetSystemConfigTrigger() for more information.
if err := txn.Run(ctx, b); err != nil {
return err
}
if err := logEvent(txn); err != nil {
return err
}
return txn.Commit(ctx)
}
// More efficient batching can be used if no event log message
// is required.
return txn.CommitInBatch(ctx, b)
})
switch err {
case nil, errDidntUpdateDescriptor:
return desc, nil
case errLeaseVersionChanged:
// will loop around to retry
default:
return nil, err
}
}
panic("not reached")
}
// countLeases returns the number of unexpired leases for a particular version
// of a descriptor.
func (s LeaseStore) countLeases(
ctx context.Context, descID sqlbase.ID, version sqlbase.DescriptorVersion, expiration time.Time,
) (int, error) {
var count int
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
p := makeInternalPlanner("leases-count", txn, security.RootUser, s.memMetrics)
defer finishInternalPlanner(p)
const countLeases = `SELECT COUNT(version) FROM system.lease ` +
`WHERE "descID" = $1 AND version = $2 AND expiration > $3`
values, err := p.QueryRow(ctx, countLeases, descID, int(version), expiration)
if err != nil {
return err
}
count = int(parser.MustBeDInt(values[0]))
return nil
})
return count, err
}
// Get the table descriptor valid for the expiration time from the store.
// We use a timestamp that is just less than the expiration time to read
// a version of the table descriptor. A tableVersionState with the
// expiration time set to expiration is returned.
//
// This returns an error when Replica.requestCanProceed() returns an
// error when the expiration timestamp is less than the storage layer
// GC threshold.
func (s LeaseStore) getForExpiration(
ctx context.Context, expiration hlc.Timestamp, id sqlbase.ID,
) (*tableVersionState, error) {
var table *tableVersionState
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
descKey := sqlbase.MakeDescMetadataKey(id)
prevTimestamp := expiration
prevTimestamp.WallTime--
txn.SetFixedTimestamp(ctx, prevTimestamp)
var desc sqlbase.Descriptor
if err := txn.GetProto(ctx, descKey, &desc); err != nil {
return err
}
tableDesc := desc.GetTable()
if tableDesc == nil {
return errors.Errorf("id %d is not a table", id)
}
if !tableDesc.ModificationTime.Less(prevTimestamp) {
return errors.Errorf("internal error: unable to read table= (%d, %s)", id, expiration)
}
// Create a tableVersionState with the table and without a lease.
table = &tableVersionState{
TableDescriptor: *tableDesc,
expiration: expiration,
}
return nil
})
return table, err
}
// tableSet maintains an ordered set of tableVersionState objects sorted
// by version. It supports addition and removal of elements, finding the
// table for a particular version, or finding the most recent table version.
// The order is maintained by insert and remove and there can only be a
// unique entry for a version. Only the last two versions can be leased,
// with the last one being the latest one which is always leased.
//
// Each entry represents a time span [ModificationTime, expiration)
// and can be used by a transaction iif:
// ModificationTime <= transaction.Timestamp < expiration.
type tableSet struct {
data []*tableVersionState
}
func (l *tableSet) String() string {
var buf bytes.Buffer
for i, s := range l.data {
if i > 0 {
buf.WriteString(" ")
}
buf.WriteString(fmt.Sprintf("%d:%d", s.Version, s.expiration.WallTime))
}
return buf.String()
}
func (l *tableSet) insert(s *tableVersionState) {
i, match := l.findIndex(s.Version)
if match {
panic("unable to insert duplicate lease")
}
if i == len(l.data) {
l.data = append(l.data, s)
return
}
l.data = append(l.data, nil)
copy(l.data[i+1:], l.data[i:])
l.data[i] = s
}
func (l *tableSet) remove(s *tableVersionState) {
i, match := l.findIndex(s.Version)
if !match {
panic(fmt.Sprintf("can't find lease to remove: %s", s))
}
l.data = append(l.data[:i], l.data[i+1:]...)
}
func (l *tableSet) find(version sqlbase.DescriptorVersion) *tableVersionState {
if i, match := l.findIndex(version); match {
return l.data[i]
}
return nil
}
func (l *tableSet) findIndex(version sqlbase.DescriptorVersion) (int, bool) {
i := sort.Search(len(l.data), func(i int) bool {
s := l.data[i]
return s.Version >= version
})
if i < len(l.data) {
s := l.data[i]
if s.Version == version {
return i, true
}
}
return i, false
}
func (l *tableSet) findNewest() *tableVersionState {
if len(l.data) == 0 {
return nil
}
return l.data[len(l.data)-1]
}
func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionState {
if len(l.data) == 0 {
return nil
}
// Find the index of the first lease with version > targetVersion.
i := sort.Search(len(l.data), func(i int) bool {
return l.data[i].Version > version
})
if i == 0 {
return nil
}
// i-1 is the index of the newest lease for the previous version (the version
// we're looking for).
s := l.data[i-1]
if s.Version == version {
return s
}
return nil
}
type tableState struct {
id sqlbase.ID
// The cache is updated every time we acquire or release a table.
tableNameCache *tableNameCache
stopper *stop.Stopper
mu struct {
syncutil.Mutex
// table descriptors sorted by increasing version. This set always
// contains a table descriptor version with a lease as the latest
// entry. There may be more than one active lease when the system is
// transitioning from one version of the descriptor to another or
// when the node preemptively acquires a new lease for a version
// when the old lease has not yet expired. In the latter case, a new
// entry is created with the expiration time of the new lease and
// the older entry is removed.
active tableSet
// A channel used to indicate whether a lease is actively being
// acquired. nil if there is no lease acquisition in progress for
// the table. If non-nil, the channel will be closed when lease
// acquisition completes.
acquiring chan struct{}
// Indicates that the table has been dropped, or is being dropped.
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
dropped bool
}
}
// acquire returns a version of the table appropriate for the timestamp
// The table will have its refcount incremented, so the caller is
// responsible for calling release() on it.
func (t *tableState) acquire(
ctx context.Context, timestamp hlc.Timestamp, m *LeaseManager,
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.
t.acquireWait()
// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
return nil, err
}
}
return t.findForTimestamp(ctx, timestamp, m)
}
// ensureVersion ensures that the latest version >= minVersion. It will
// check if the latest known version meets the criterion, or attempt to
// acquire a lease at the latest version with the hope that it meets
// the criterion.
func (t *tableState) ensureVersion(
ctx context.Context, minVersion sqlbase.DescriptorVersion, m *LeaseManager,
) error {
t.mu.Lock()
defer t.mu.Unlock()
if s := t.mu.active.findNewest(); s != nil && minVersion <= s.Version {
return nil
}
if err := t.acquireFreshestFromStoreLocked(ctx, m); err != nil {
return err
}
if s := t.mu.active.findNewest(); s != nil && s.Version < minVersion {
return errors.Errorf("version %d for table %s does not exist yet", minVersion, s.Name)
}
return nil
}
// Find the table descriptor valid for the particular timestamp. This
// function is called after ensuring that there is a lease for the latest
// version of the table descriptor and the lease is far from expiring.
// Normally the latest version of a table descriptor if valid is returned.
// If the valid version doesn't exist it is read from the store. The refcount
// for the returned tableVersionState is incremented.
func (t *tableState) findForTimestamp(
ctx context.Context, timestamp hlc.Timestamp, m *LeaseManager,
) (*tableVersionState, error) {
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModififcationTime is valid.
if table := t.mu.active.data[i]; !timestamp.Less(table.ModificationTime) {
if timestamp.Less(table.expiration) {
// Existing valid table version.
table.incRefcount()
return table, nil
}
// We need a version after data[i], but before data[i+1].
// We could very well use the timestamp to read the table
// descriptor, but unfortunately we will not be able to assign
// it a proper expiration time. Therefore, we read table
// descriptors versions one by one from afterIdx back into the
// past until we find a valid one.
afterIdx = i + 1
if afterIdx == len(t.mu.active.data) {
return nil, fmt.Errorf("requesting a table version ahead of latest version")
}
break
}
}
// Read table descriptor versions one by one into the past until we
// find a valid one. Every version is assigned an expiration time that
// is the ModificationTime of the previous one read.
expiration := t.mu.active.data[afterIdx].ModificationTime
var versions []*tableVersionState
// We're called with mu locked, but need to unlock it while reading
// the descriptors from the store.
t.mu.Unlock()
for {
table, err := m.LeaseStore.getForExpiration(ctx, expiration, t.id)
if err != nil {
t.mu.Lock()
return nil, err
}
versions = append(versions, table)
if !timestamp.Less(table.ModificationTime) {
break
}
// Set the expiration time for the next table.
expiration = table.ModificationTime
}
t.mu.Lock()
// Insert all the table versions and return the last one.
var table *tableVersionState
for _, tableVersion := range versions {
// Since we gave up the lock while reading the versions from
// the store we have to ensure that no one else inserted the
// same table version.
table = t.mu.active.findVersion(tableVersion.Version)
if table == nil {
table = tableVersion
t.mu.active.insert(tableVersion)
}
}
table.incRefcount()
return table, nil
}
// acquireFreshestFromStoreLocked acquires a new lease from the store and
// inserts it into the active set. It guarantees that the lease returned is
// the one acquired after the call is made. Use this if the lease we want to
// get needs to see some descriptor updates that we know happened recently
// (but that didn't cause the version to be incremented). E.g. if we suspect
// there's a new name for a table, the caller can insist on getting a lease
// reflecting this new name. Moreover, upon returning, the new lease is
// guaranteed to be the last lease in t.mu.active (note that this is not
// generally guaranteed, as leases are assigned random expiration times).
//
// t.mu must be locked.
func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error {
// Ensure there is no lease acquisition in progress.
t.acquireWait()
// Move forward to acquire a fresh table lease.
// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
}
return t.acquireNodeLease(ctx, m, minExpirationTime)
}
// upsertLocked inserts a lease for a particular table version.
// If an existing lease exists for the table version, it releases
// the older lease and replaces it.
func (t *tableState) upsertLocked(ctx context.Context, table *tableVersionState, m *LeaseManager) {
s := t.mu.active.find(table.Version)
if s == nil {
t.mu.active.insert(table)
return
}
s.mu.Lock()
table.mu.Lock()
// subsume the refcount of the older lease.
table.refcount += s.refcount
s.refcount = 0
s.leased = false
table.mu.Unlock()
s.mu.Unlock()
log.VEventf(ctx, 2, "replaced lease: %s with %s", s, table)
t.mu.active.remove(s)
t.mu.active.insert(table)
t.releaseLease(s, m)
}
// removeInactiveVersions removes inactive versions in t.mu.active.data with refcount 0.
// t.mu must be locked.
func (t *tableState) removeInactiveVersions(m *LeaseManager) {
// A copy of t.mu.active.data must be made since t.mu.active.data will be changed
// within the loop.
for _, table := range append([]*tableVersionState(nil), t.mu.active.data...) {
func() {
table.mu.Lock()
defer table.mu.Unlock()
if table.refcount == 0 {
t.mu.active.remove(table)
if table.leased {
table.leased = false
t.releaseLease(table, m)
}
}
}()
}
}
// acquireWait waits until no lease acquisition is in progress.
func (t *tableState) acquireWait() {
// Spin until no lease acquisition is in progress.
for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring {
// We're called with mu locked, but need to unlock it while we wait
// for the in-progress lease acquisition to finish.
t.mu.Unlock()
<-acquiring
t.mu.Lock()
}
}
// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
// minExpirationTime, if not set to the zero value, will be used as a lower
// bound on the expiration of the new table. This can be used to eliminate the
// jitter in the expiration time, and guarantee that we get a lease that will be
// inserted at the end of the lease set (i.e. it will be returned by
// findNewest() from now on).
//
// t.mu needs to be locked.
func (t *tableState) acquireNodeLease(
ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp,
) error {
if m.isDraining() {
return errors.New("cannot acquire lease when draining")
}
// Notify when lease has been acquired.
t.mu.acquiring = make(chan struct{})
defer func() {
close(t.mu.acquiring)
t.mu.acquiring = nil
}()
// We're called with mu locked, but need to unlock it during lease
// acquisition.
t.mu.Unlock()
table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime)
t.mu.Lock()
if err != nil {
return err
}
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)
return nil
}
func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error {
t.mu.Lock()
defer t.mu.Unlock()
s := t.mu.active.find(table.Version)
if s == nil {
return errors.Errorf("table %d version %d not found", table.ID, table.Version)
}
// Decrements the refcount and returns true if the lease has to be removed
// from the store.
decRefcount := func(s *tableVersionState) bool {
// Figure out if we'd like to remove the lease from the store asap (i.e.
// when the refcount drops to 0). If so, we'll need to mark the lease as
// invalid.
removeOnceDereferenced := m.LeaseStore.testingKnobs.RemoveOnceDereferenced ||
// Release from the store if the table has been dropped; no leases
// can be acquired any more.
t.mu.dropped ||
// Release from the store if the LeaseManager is draining.
m.isDraining() ||
// Release from the store if the lease is not for the latest
// version; only leases for the latest version can be acquired.
s != t.mu.active.findNewest()
s.mu.Lock()
defer s.mu.Unlock()
s.refcount--
log.VEventf(context.TODO(), 2, "release: %s", s)
if s.refcount < 0 {
panic(fmt.Sprintf("negative ref count: %s", s))
}
if s.refcount == 0 && s.leased && removeOnceDereferenced {
s.leased = false
return true
}
return false
}
if decRefcount(s) {
t.mu.active.remove(s)
t.releaseLease(s, m)
}
return nil
}
// release the lease associated with the table version.
// t.mu needs to be locked.
func (t *tableState) releaseLease(table *tableVersionState, m *LeaseManager) {
t.tableNameCache.remove(table)
ctx := context.TODO()
if m.isDraining() {
// Release synchronously to guarantee release before exiting.
m.LeaseStore.release(ctx, t.stopper, table)
return
}
// Release to the store asynchronously, without the tableState lock.
if err := t.stopper.RunAsyncTask(
ctx, "sql.tableState: releasing descriptor lease",
func(ctx context.Context) {
m.LeaseStore.release(ctx, t.stopper, table)
}); err != nil {
log.Warningf(ctx, "error: %s, not releasing lease: %q", err, table)
}
}
// purgeOldVersions removes old unused table descriptor versions older than
// minVersion and releases any associated leases.
// If dropped is set, minVersion is ignored; no lease is acquired and all
// existing unused versions are removed. The table is further marked dropped,
// which will cause existing in-use leases to be eagerly released once
// they're not in use any more.
// If t has no active leases, nothing is done.
func (t *tableState) purgeOldVersions(
ctx context.Context,
db *client.DB,
dropped bool,
minVersion sqlbase.DescriptorVersion,
m *LeaseManager,
) error {
t.mu.Lock()
empty := len(t.mu.active.data) == 0
t.mu.Unlock()
if empty {
// We don't currently have a version on this table, so no need to refresh
// anything.
return nil
}
removeInactives := func(drop bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.dropped = drop
t.removeInactiveVersions(m)
}
if dropped {
removeInactives(dropped)
return nil
}
if err := t.ensureVersion(ctx, minVersion, m); err != nil {
return err
}
// Acquire a lease on the table on the latest version to maintain an
// active lease, so that it doesn't get released when removeInactives()
// is called below. Release this lease after calling removeInactives().
table, err := t.acquire(ctx, m.clock.Now(), m)
if dropped := err == errTableDropped; dropped || err == nil {
removeInactives(dropped)
if table != nil {
return t.release(&table.TableDescriptor, m)
}
return nil
}
return err
}
// LeaseStoreTestingKnobs contains testing knobs.
type LeaseStoreTestingKnobs struct {
// Called after a lease is removed from the store, with any operation error.
// See LeaseRemovalTracker.
LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error)
// Called after a lease is acquired, with any operation error.
LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error)
// RemoveOnceDereferenced forces leases to be removed
// as soon as they are dereferenced.
RemoveOnceDereferenced bool
}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*LeaseStoreTestingKnobs) ModuleTestingKnobs() {}
var _ base.ModuleTestingKnobs = &LeaseStoreTestingKnobs{}
// LeaseManagerTestingKnobs contains test knobs.
type LeaseManagerTestingKnobs struct {
// A callback called when a gossip update is received, before the leases are
// refreshed. Careful when using this to block for too long - you can block
// all the gossip users in the system.
GossipUpdateEvent func(config.SystemConfig)
// A callback called after the leases are refreshed as a result of a gossip update.
TestingLeasesRefreshedEvent func(config.SystemConfig)
LeaseStoreTestingKnobs LeaseStoreTestingKnobs
}
var _ base.ModuleTestingKnobs = &LeaseManagerTestingKnobs{}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*LeaseManagerTestingKnobs) ModuleTestingKnobs() {}
type tableNameCacheKey struct {
dbID sqlbase.ID
normalizeTabledName string
}
// tableNameCache is a cache of table name -> latest table version mappings.
// The LeaseManager updates the cache every time a lease is acquired or released
// from the store. The cache maintains the latest version for each table name.
// All methods are thread-safe.
type tableNameCache struct {
mu syncutil.Mutex
tables map[tableNameCacheKey]*tableVersionState
}
// Resolves a (database ID, table name) to the table descriptor's ID.
// Returns a valid tableVersionState for the table with that name,
// if the name had been previously cached and the cache has a table
// version that has not expired. Returns nil otherwise.
// This method handles normalizing the table name.
// The table's refcount is incremented before returning, so the caller
// is responsible for releasing it to the leaseManager.
func (c *tableNameCache) get(
dbID sqlbase.ID, tableName string, timestamp hlc.Timestamp,
) *tableVersionState {
c.mu.Lock()
table, ok := c.tables[makeTableNameCacheKey(dbID, tableName)]
c.mu.Unlock()
if !ok {
return nil
}
table.mu.Lock()
defer table.mu.Unlock()
if !nameMatchesTable(&table.TableDescriptor, dbID, tableName) {
panic(fmt.Sprintf("Out of sync entry in the name cache. "+
"Cache entry: %d.%q -> %d. Lease: %d.%q.",
dbID, tableName, table.ID, table.ParentID, table.Name))
}
if !table.leased {
// This get() raced with a release operation. The leaseManager should remove
// this cache entry soon.
return nil
}
// Expired table. Don't hand it out.
if table.hasExpired(timestamp) {
return nil
}
table.incRefcountLocked()
return table
}
func (c *tableNameCache) insert(table *tableVersionState) {
c.mu.Lock()
defer c.mu.Unlock()
key := makeTableNameCacheKey(table.ParentID, table.Name)
existing, ok := c.tables[key]
if !ok {
c.tables[key] = table
return
}
// If we already have a lease in the cache for this name, see if this one is
// better (higher version or later expiration).
if table.Version > existing.Version ||
(table.Version == existing.Version && (existing.expiration.Less(table.expiration))) {
// Overwrite the old table. The new one is better. From now on, we want
// clients to use the new one.
c.tables[key] = table
}
}
func (c *tableNameCache) remove(table *tableVersionState) {
c.mu.Lock()
defer c.mu.Unlock()
key := makeTableNameCacheKey(table.ParentID, table.Name)
existing, ok := c.tables[key]
if !ok {
// Table for lease not found in table name cache. This can happen if we had
// a more recent lease on the table in the tableNameCache, then the table
// gets dropped, then the more recent lease is remove()d - which clears the
// cache.
return
}
// If this was the lease that the cache had for the table name, remove it.
// If the cache had some other table, this remove is a no-op.
if existing == table {
delete(c.tables, key)
}
}
func makeTableNameCacheKey(dbID sqlbase.ID, tableName string) tableNameCacheKey {
return tableNameCacheKey{dbID, tableName}
}
// LeaseManager manages acquiring and releasing per-table leases. It also
// handles resolving table names to descriptor IDs. The leases are managed
// internally with a table descriptor and expiration time exported by the
// API. The table descriptor acquired needs to be released. A transaction
// can use a table descriptor as long as its timestamp is within the
// validity window for the descriptor:
// descriptor.ModificationTime <= txn.Timestamp < expirationTime
//
// Exported only for testing.
//
// The locking order is:
// LeaseManager.mu > tableState.mu > tableNameCache.mu > tableVersionState.mu
type LeaseManager struct {
LeaseStore
mu struct {
syncutil.Mutex
tables map[sqlbase.ID]*tableState
}
draining atomic.Value
// tableNames is a cache for name -> id mappings. A mapping for the cache
// should only be used if we currently have an active lease on the respective
// id; otherwise, the mapping may well be stale.
// Not protected by mu.
tableNames tableNameCache
testingKnobs LeaseManagerTestingKnobs
stopper *stop.Stopper
}
// NewLeaseManager creates a new LeaseManager.
//
// stopper is used to run async tasks. Can be nil in tests.
func NewLeaseManager(
nodeID *base.NodeIDContainer,
db client.DB,
clock *hlc.Clock,
testingKnobs LeaseManagerTestingKnobs,
stopper *stop.Stopper,
memMetrics *MemoryMetrics,
) *LeaseManager {
lm := &LeaseManager{
LeaseStore: LeaseStore{
db: db,
clock: clock,
nodeID: nodeID,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
testingKnobs: testingKnobs,
tableNames: tableNameCache{
tables: make(map[tableNameCacheKey]*tableVersionState),
},
stopper: stopper,
}
lm.mu.Lock()
lm.mu.tables = make(map[sqlbase.ID]*tableState)
lm.mu.Unlock()
lm.draining.Store(false)
return lm
}
func nameMatchesTable(table *sqlbase.TableDescriptor, dbID sqlbase.ID, tableName string) bool {
return table.ParentID == dbID && table.Name == tableName
}
// AcquireByName returns a table version for the specified table valid for
// the timestamp. It returns the table descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
func (m *LeaseManager) AcquireByName(
ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
// Check if we have cached an ID for this name.
tableVersion := m.tableNames.get(dbID, tableName, timestamp)
if tableVersion != nil {
if !timestamp.Less(tableVersion.ModificationTime) {
return &tableVersion.TableDescriptor, tableVersion.expiration, nil
}
if err := m.Release(&tableVersion.TableDescriptor); err != nil {
return nil, hlc.Timestamp{}, err
}
// Return a valid table descriptor for the timestamp.
table, expiration, err := m.Acquire(ctx, timestamp, tableVersion.ID)
if err != nil {
return nil, hlc.Timestamp{}, err
}
return table, expiration, nil
}
// We failed to find something in the cache, or what we found is not
// guaranteed to be valid by the time we use it because we don't have a
// lease with at least a bit of lifetime left in it. So, we do it the hard
// way: look in the database to resolve the name, then acquire a new table.
var err error
tableID, err := m.resolveName(ctx, timestamp, dbID, tableName)
if err != nil {
return nil, hlc.Timestamp{}, err
}
table, expiration, err := m.Acquire(ctx, timestamp, tableID)
if err != nil {
return nil, hlc.Timestamp{}, err
}
if !nameMatchesTable(table, dbID, tableName) {
// We resolved name `tableName`, but the lease has a different name in it.
// That can mean two things. Assume the table is being renamed from A to B.
// a) `tableName` is A. The transaction doing the RENAME committed (so the
// descriptor has been updated to B), but its schema changer has not
// finished yet. B is the new name of the table, queries should use that. If
// we already had a lease with name A, we would've allowed to use it (but we
// don't, otherwise the cache lookup above would've given it to us). Since
// we don't, let's not allow A to be used, given that the lease now has name
// B in it. It'd be sketchy to allow A to be used with an inconsistent name
// in the table.
//
// b) `tableName` is B. Like in a), the transaction doing the RENAME
// committed (so the descriptor has been updated to B), but its schema
// change has not finished yet. We still had a valid lease with name A in
// it. What to do, what to do? We could allow name B to be used, but who
// knows what consequences that would have, since its not consistent with
// the table. We could say "table B not found", but that means that, until
// the next gossip update, this node would not service queries for this
// table under the name B. That's no bueno, as B should be available to be
// used immediately after the RENAME transaction has committed.
// The problem is that we have a lease that we know is stale (the descriptor
// in the DB doesn't necessarily have a new version yet, but it definitely
// has a new name). So, lets force getting a fresh table.
// This case (modulo the "committed" part) also applies when the txn doing a
// RENAME had a lease on the old name, and then tries to use the new name
// after the RENAME statement.
//
// How do we disambiguate between the a) and b)? We get a fresh lease on
// the descriptor, as required by b), and then we'll know if we're trying to
// resolve the current or the old name.
//
// TODO(vivek): check if the entire above comment is indeed true. Review the
// use of nameMatchesTable() throughout this function.
if err := m.Release(table); err != nil {
log.Warningf(ctx, "error releasing lease: %s", err)
}
table, expiration, err = m.acquireFreshestFromStore(ctx, tableID)
if err != nil {
return nil, hlc.Timestamp{}, err
}
if !nameMatchesTable(table, dbID, tableName) {
// If the name we had doesn't match the newest descriptor in the DB, then
// we're trying to use an old name.
if err := m.Release(table); err != nil {
log.Warningf(ctx, "error releasing lease: %s", err)
}
return nil, hlc.Timestamp{}, sqlbase.ErrDescriptorNotFound
}
}
return table, expiration, nil
}
// resolveName resolves a table name to a descriptor ID at a particular
// timestamp by looking in the database. If the mapping is not found,
// sqlbase.ErrDescriptorNotFound is returned.
func (m *LeaseManager) resolveName(
ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string,
) (sqlbase.ID, error) {
nameKey := tableKey{dbID, tableName}
key := nameKey.Key()
id := sqlbase.InvalidID
if err := m.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, timestamp)
gr, err := txn.Get(ctx, key)
if err != nil {
return err
}
if !gr.Exists() {
return nil
}
id = sqlbase.ID(gr.ValueInt())
return nil
}); err != nil {
return id, err
}
if id == sqlbase.InvalidID {
return id, sqlbase.ErrDescriptorNotFound
}
return id, nil
}
// Acquire acquires a read lease for the specified table ID valid for
// the timestamp. It returns the table descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
func (m *LeaseManager) Acquire(
ctx context.Context, timestamp hlc.Timestamp, tableID sqlbase.ID,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
t := m.findTableState(tableID, true)
table, err := t.acquire(ctx, timestamp, m)
if err != nil {
return nil, hlc.Timestamp{}, err
}
return &table.TableDescriptor, table.expiration, nil
}
// acquireFreshestFromStore acquires a new lease from the store. The returned
// table is guaranteed to have a version of the descriptor at least as recent as
// the time of the call (i.e. if we were in the process of acquiring a lease
// already, that lease is not good enough). The expiration time is returned
// along with the table descriptor.
func (m *LeaseManager) acquireFreshestFromStore(
ctx context.Context, tableID sqlbase.ID,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
t := m.findTableState(tableID, true)
t.mu.Lock()
defer t.mu.Unlock()
if err := t.acquireFreshestFromStoreLocked(
ctx, m,
); err != nil {
return nil, hlc.Timestamp{}, err
}
table := t.mu.active.findNewest()
if table == nil {
panic("no lease in active set after having just acquired one")
}
table.incRefcount()
return &table.TableDescriptor, table.expiration, nil
}
// Release releases a previously acquired table.
func (m *LeaseManager) Release(desc *sqlbase.TableDescriptor) error {
t := m.findTableState(desc.ID, false /* create */)
if t == nil {
return errors.Errorf("table %d not found", desc.ID)
}
// TODO(pmattis): Can/should we delete from LeaseManager.tables if the
// tableState becomes empty?
// TODO(andrei): I think we never delete from LeaseManager.tables... which
// could be bad if a lot of tables keep being created. I looked into cleaning
// up a bit, but it seems tricky to do with the current locking which is split
// between LeaseManager and tableState.
return t.release(desc, m)
}
func (m *LeaseManager) isDraining() bool {
return m.draining.Load().(bool)
}
// SetDraining (when called with 'true') removes all inactive leases. Any leases
// that are active will be removed once the lease's reference count drops to 0.
func (m *LeaseManager) SetDraining(drain bool) {
m.draining.Store(drain)
if !drain {
return
}
m.mu.Lock()
defer m.mu.Unlock()
for _, t := range m.mu.tables {
t.mu.Lock()
t.removeInactiveVersions(m)
t.mu.Unlock()
}
}
// If create is set, cache and stopper need to be set as well.
func (m *LeaseManager) findTableState(tableID sqlbase.ID, create bool) *tableState {
m.mu.Lock()
defer m.mu.Unlock()
t := m.mu.tables[tableID]
if t == nil && create {
t = &tableState{id: tableID, tableNameCache: &m.tableNames, stopper: m.stopper}
m.mu.tables[tableID] = t
}
return t
}
// RefreshLeases starts a goroutine that refreshes the lease manager
// leases for tables received in the latest system configuration via gossip.
func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *client.DB, gossip *gossip.Gossip) {
ctx := context.TODO()
s.RunWorker(ctx, func(ctx context.Context) {
descKeyPrefix := keys.MakeTablePrefix(uint32(sqlbase.DescriptorTable.ID))
gossipUpdateC := gossip.RegisterSystemConfigChannel()
for {
select {
case <-gossipUpdateC:
cfg, _ := gossip.GetSystemConfig()
if m.testingKnobs.GossipUpdateEvent != nil {
m.testingKnobs.GossipUpdateEvent(cfg)
}
// Read all tables and their versions
if log.V(2) {
log.Info(ctx, "received a new config; will refresh leases")
}
// Loop through the configuration to find all the tables.
for _, kv := range cfg.Values {
if !bytes.HasPrefix(kv.Key, descKeyPrefix) {
continue
}
// Attempt to unmarshal config into a table/database descriptor.
var descriptor sqlbase.Descriptor
if err := kv.Value.GetProto(&descriptor); err != nil {
log.Warningf(ctx, "%s: unable to unmarshal descriptor %v", kv.Key, kv.Value)
continue
}
switch union := descriptor.Union.(type) {
case *sqlbase.Descriptor_Table:
table := union.Table
table.MaybeUpgradeFormatVersion()
if err := table.ValidateTable(); err != nil {
log.Errorf(ctx, "%s: received invalid table descriptor: %v", kv.Key, table)
continue
}
if log.V(2) {
log.Infof(ctx, "%s: refreshing lease table: %d (%s), version: %d, dropped: %t",
kv.Key, table.ID, table.Name, table.Version, table.Dropped())
}
// Try to refresh the table lease to one >= this version.
if t := m.findTableState(table.ID, false /* create */); t != nil {
if err := t.purgeOldVersions(
ctx, db, table.Dropped(), table.Version, m); err != nil {
log.Warningf(ctx, "error purging leases for table %d(%s): %s",
table.ID, table.Name, err)
}
}
case *sqlbase.Descriptor_Database:
// Ignore.
}
}
if m.testingKnobs.TestingLeasesRefreshedEvent != nil {
m.testingKnobs.TestingLeasesRefreshedEvent(cfg)
}
case <-s.ShouldStop():
return
}
}
})
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.1.9-rc.1

搜索帮助

344bd9b3 5694891 D2dac590 5694891