2 Star 2 Fork 1

cockroachdb/cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mvcc.go 72.55 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Jiang-Ming Yang (jiangming.yang@gmail.com)
// Author: Spencer Kimball (spencer.kimball@gmail.com)
package engine
import (
"bytes"
"fmt"
"math"
"sync"
"golang.org/x/net/context"
"github.com/dustin/go-humanize"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)
const (
// The size of the timestamp portion of MVCC version keys (used to update stats).
mvccVersionTimestampSize int64 = 12
)
var (
// MVCCKeyMax is a maximum mvcc-encoded key value which sorts after
// all other keys.`
MVCCKeyMax = MakeMVCCMetadataKey(roachpb.KeyMax)
// NilKey is the nil MVCCKey.
NilKey = MVCCKey{}
)
// AccountForSelf adjusts ms to account for the predicted impact it will have on
// the values that it records when the structure is initially stored. Specifically,
// MVCCStats is stored on the RangeStats key, which means that its creation will
// have an impact on system-local data size and key count.
func AccountForSelf(ms *enginepb.MVCCStats, rangeID roachpb.RangeID) error {
key := keys.RangeStatsKey(rangeID)
metaKey := MakeMVCCMetadataKey(key)
// MVCCStats is stored inline, so compute MVCCMetadata accordingly.
value := roachpb.Value{}
if err := value.SetProto(ms); err != nil {
return err
}
meta := enginepb.MVCCMetadata{RawBytes: value.RawBytes}
updateStatsForInline(ms, key, 0, 0, int64(metaKey.EncodedSize()), int64(meta.Size()))
return nil
}
// MakeValue returns the inline value.
func MakeValue(meta enginepb.MVCCMetadata) roachpb.Value {
return roachpb.Value{RawBytes: meta.RawBytes}
}
// IsIntentOf returns true if the meta record is an intent of the supplied
// transaction.
func IsIntentOf(meta enginepb.MVCCMetadata, txn *roachpb.Transaction) bool {
return meta.Txn != nil && txn != nil && roachpb.TxnIDEqual(meta.Txn.ID, txn.ID)
}
// MVCCKey is a versioned key, distinguished from roachpb.Key with the addition
// of a timestamp.
type MVCCKey struct {
Key roachpb.Key
Timestamp hlc.Timestamp
}
// MakeMVCCMetadataKey creates an MVCCKey from a roachpb.Key.
func MakeMVCCMetadataKey(key roachpb.Key) MVCCKey {
return MVCCKey{Key: key}
}
// Next returns the next key.
func (k MVCCKey) Next() MVCCKey {
ts := k.Timestamp.Prev()
if ts == (hlc.Timestamp{}) {
return MVCCKey{
Key: k.Key.Next(),
}
}
return MVCCKey{
Key: k.Key,
Timestamp: ts,
}
}
// Less compares two keys.
func (k MVCCKey) Less(l MVCCKey) bool {
if c := k.Key.Compare(l.Key); c != 0 {
return c < 0
}
if !l.IsValue() {
return false
}
return l.Timestamp.Less(k.Timestamp)
}
// Equal returns whether two keys are identical.
func (k MVCCKey) Equal(l MVCCKey) bool {
return k.Key.Compare(l.Key) == 0 && k.Timestamp == l.Timestamp
}
// IsValue returns true iff the timestamp is non-zero.
func (k MVCCKey) IsValue() bool {
return k.Timestamp != (hlc.Timestamp{})
}
// EncodedSize returns the size of the MVCCKey when encoded.
func (k MVCCKey) EncodedSize() int {
n := len(k.Key) + 1
if k.IsValue() {
// Note that this isn't quite accurate: timestamps consume between 8-13
// bytes. Fixing this only adjusts the accounting for timestamps, not the
// actual on disk storage.
n += int(mvccVersionTimestampSize)
}
return n
}
// String returns a string-formatted version of the key.
func (k MVCCKey) String() string {
if !k.IsValue() {
return k.Key.String()
}
return fmt.Sprintf("%s/%s", k.Key, k.Timestamp)
}
// MVCCKeyValue contains the raw bytes of the value for a key.
type MVCCKeyValue struct {
Key MVCCKey
Value []byte
}
// isSysLocal returns whether the whether the key is system-local.
func isSysLocal(key roachpb.Key) bool {
return key.Compare(keys.LocalMax) < 0
}
// updateStatsForInline updates stat counters for an inline value.
// These are simpler as they don't involve intents or multiple
// versions.
func updateStatsForInline(
ms *enginepb.MVCCStats,
key roachpb.Key,
origMetaKeySize, origMetaValSize, metaKeySize, metaValSize int64,
) {
sys := isSysLocal(key)
// Remove counts for this key if the original size is non-zero.
if origMetaKeySize != 0 {
if sys {
ms.SysBytes -= (origMetaKeySize + origMetaValSize)
ms.SysCount--
} else {
ms.LiveBytes -= (origMetaKeySize + origMetaValSize)
ms.LiveCount--
ms.KeyBytes -= origMetaKeySize
ms.ValBytes -= origMetaValSize
ms.KeyCount--
ms.ValCount--
}
}
// Add counts for this key if the new size is non-zero.
if metaKeySize != 0 {
if sys {
ms.SysBytes += metaKeySize + metaValSize
ms.SysCount++
} else {
ms.LiveBytes += metaKeySize + metaValSize
ms.LiveCount++
ms.KeyBytes += metaKeySize
ms.ValBytes += metaValSize
ms.KeyCount++
ms.ValCount++
}
}
}
// updateStatsOnMerge updates metadata stats while merging inlined
// values. Unfortunately, we're unable to keep accurate stats on merge
// as the actual details of the merge play out asynchronously during
// compaction. Instead, we undercount by adding only the size of the
// value.Bytes byte slice (an estimated 12 bytes for timestamp,
// included in valSize by caller). These errors are corrected during
// splits and merges.
func updateStatsOnMerge(key roachpb.Key, valSize, nowNanos int64) enginepb.MVCCStats {
var ms enginepb.MVCCStats
sys := isSysLocal(key)
ms.AgeTo(nowNanos)
ms.ContainsEstimates = true
if sys {
ms.SysBytes += valSize
} else {
ms.LiveBytes += valSize
ms.ValBytes += valSize
}
return ms
}
// updateStatsOnPut updates stat counters for a newly put value,
// including both the metadata key & value bytes and the mvcc
// versioned value's key & value bytes. If the value is not a
// deletion tombstone, updates the live stat counters as well.
// If this value is an intent, updates the intent counters.
func updateStatsOnPut(
key roachpb.Key,
origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize int64,
orig, meta *enginepb.MVCCMetadata,
) enginepb.MVCCStats {
var ms enginepb.MVCCStats
sys := isSysLocal(key)
// Remove current live counts for this key.
if orig != nil {
if sys {
ms.SysBytes -= (origMetaKeySize + origMetaValSize)
ms.SysCount--
} else {
// Move the (so far empty) stats to the timestamp at which the
// previous entry was created, which is where we wish to reclassify
// its contributions.
ms.AgeTo(orig.Timestamp.WallTime)
// If original version value for this key wasn't deleted, subtract
// its contribution from live bytes in anticipation of adding in
// contribution from new version below.
if !orig.Deleted {
ms.LiveBytes -= orig.KeyBytes + orig.ValBytes + origMetaKeySize + origMetaValSize
ms.LiveCount--
// Also, add the bytes from overwritten value to the GC'able bytes age stat.
}
ms.KeyBytes -= origMetaKeySize
ms.ValBytes -= origMetaValSize
ms.KeyCount--
// If the original metadata for this key was an intent, subtract
// its contribution from stat counters as it's being replaced.
if orig.Txn != nil {
// Subtract counts attributable to intent we're replacing.
ms.KeyBytes -= orig.KeyBytes
ms.ValBytes -= orig.ValBytes
ms.ValCount--
ms.IntentBytes -= (orig.KeyBytes + orig.ValBytes)
ms.IntentCount--
}
}
}
// Move the stats to the new meta's timestamp. If we had an orig meta, this
// ages those original stats by the time which the previous version was live.
ms.AgeTo(meta.Timestamp.WallTime)
if sys {
ms.SysBytes += meta.KeyBytes + meta.ValBytes + metaKeySize + metaValSize
ms.SysCount++
} else {
// If new version isn't a deletion tombstone, add it to live counters.
if !meta.Deleted {
ms.LiveBytes += meta.KeyBytes + meta.ValBytes + metaKeySize + metaValSize
ms.LiveCount++
}
ms.KeyBytes += meta.KeyBytes + metaKeySize
ms.ValBytes += meta.ValBytes + metaValSize
ms.KeyCount++
ms.ValCount++
if meta.Txn != nil {
ms.IntentBytes += meta.KeyBytes + meta.ValBytes
ms.IntentCount++
}
}
return ms
}
// updateStatsOnResolve updates stat counters with the difference
// between the original and new metadata sizes. The size of the
// resolved value (key & bytes) are subtracted from the intents
// counters if commit=true.
func updateStatsOnResolve(
key roachpb.Key,
origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize int64,
orig, meta enginepb.MVCCMetadata,
commit bool,
) enginepb.MVCCStats {
var ms enginepb.MVCCStats
// In this case, we're only removing the contribution from having the
// meta key around from orig.Timestamp to meta.Timestamp.
ms.AgeTo(orig.Timestamp.WallTime)
sys := isSysLocal(key)
// Always zero.
keyDiff := metaKeySize - origMetaKeySize
// This is going to be nonpositive: the old meta key was
// real, the new one is implicit.
valDiff := metaValSize - origMetaValSize
if sys {
ms.SysBytes += keyDiff + valDiff
} else {
if !meta.Deleted {
ms.LiveBytes += keyDiff + valDiff
}
ms.KeyBytes += keyDiff
ms.ValBytes += valDiff
// If committing, subtract out intent counts.
if commit {
ms.IntentBytes -= (meta.KeyBytes + meta.ValBytes)
ms.IntentCount--
}
}
ms.AgeTo(meta.Timestamp.WallTime)
return ms
}
// updateStatsOnAbort updates stat counters by subtracting an
// aborted value's key and value byte sizes. If an earlier version
// was restored, the restored values are added to live bytes and
// count if the restored value isn't a deletion tombstone.
func updateStatsOnAbort(
key roachpb.Key,
origMetaKeySize, origMetaValSize,
restoredMetaKeySize, restoredMetaValSize int64,
orig, restored *enginepb.MVCCMetadata,
restoredNanos, txnNanos int64,
) enginepb.MVCCStats {
sys := isSysLocal(key)
var ms enginepb.MVCCStats
// Three epochs of time here:
// 1) creation of previous value (or 0) to creation of intent:
// [restoredNanos, orig.Timestamp.WallTime)
// 2) creation of the intent (which we're now aborting) to the timestamp
// at which we're aborting:
// [orig.Timestamp.WallTime, txnNanos)
if restored != nil {
ms.AgeTo(restoredNanos)
if sys {
ms.SysBytes += restoredMetaKeySize + restoredMetaValSize
ms.SysCount++
} else {
if !restored.Deleted {
ms.LiveBytes += restored.KeyBytes + restored.ValBytes + restoredMetaKeySize + restoredMetaValSize
ms.LiveCount++
}
ms.KeyBytes += restoredMetaKeySize
ms.ValBytes += restoredMetaValSize
ms.KeyCount++
if restored.Txn != nil {
panic("restored version should never be an intent")
}
}
}
ms.AgeTo(orig.Timestamp.WallTime)
origTotalBytes := orig.KeyBytes + orig.ValBytes + origMetaKeySize + origMetaValSize
if sys {
ms.SysBytes -= origTotalBytes
ms.SysCount--
} else {
if !orig.Deleted {
ms.LiveBytes -= origTotalBytes
ms.LiveCount--
}
ms.KeyBytes -= (orig.KeyBytes + origMetaKeySize)
ms.ValBytes -= (orig.ValBytes + origMetaValSize)
ms.KeyCount--
ms.ValCount--
ms.IntentBytes -= (orig.KeyBytes + orig.ValBytes)
ms.IntentCount--
}
ms.AgeTo(txnNanos)
return ms
}
// updateStatsOnGC updates stat counters after garbage collection
// by subtracting key and value byte counts, updating key and
// value counts, and updating the GC'able bytes age. If meta is
// not nil, then the value being GC'd is the mvcc metadata and we
// decrement the key count.
func updateStatsOnGC(
key roachpb.Key, keySize, valSize int64, meta *enginepb.MVCCMetadata, fromNS, toNS int64,
) enginepb.MVCCStats {
var ms enginepb.MVCCStats
ms.AgeTo(fromNS)
sys := isSysLocal(key)
if sys {
ms.SysBytes -= (keySize + valSize)
if meta != nil {
ms.SysCount--
}
} else {
ms.KeyBytes -= keySize
ms.ValBytes -= valSize
if meta != nil {
ms.KeyCount--
} else {
ms.ValCount--
}
}
ms.AgeTo(toNS)
return ms
}
// MVCCGetRangeStats reads stat counters for the specified range and
// sets the values in the enginepb.MVCCStats struct.
func MVCCGetRangeStats(
ctx context.Context, engine Reader, rangeID roachpb.RangeID,
) (enginepb.MVCCStats, error) {
var ms enginepb.MVCCStats
_, err := MVCCGetProto(ctx, engine, keys.RangeStatsKey(rangeID), hlc.Timestamp{}, true, nil, &ms)
return ms, err
}
// MVCCSetRangeStats sets stat counters for specified range.
func MVCCSetRangeStats(
ctx context.Context, engine ReadWriter, rangeID roachpb.RangeID, ms *enginepb.MVCCStats,
) error {
return MVCCPutProto(ctx, engine, nil, keys.RangeStatsKey(rangeID), hlc.Timestamp{}, nil, ms)
}
// MVCCGetProto fetches the value at the specified key and unmarshals it into
// msg if msg is non-nil. Returns true on success or false if the key was not
// found. The semantics of consistent are the same as in MVCCGet.
func MVCCGetProto(
ctx context.Context,
engine Reader,
key roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
msg proto.Message,
) (bool, error) {
// TODO(tschottdorf): Consider returning skipped intents to the caller.
value, _, mvccGetErr := MVCCGet(ctx, engine, key, timestamp, consistent, txn)
found := value != nil
// If we found a result, parse it regardless of the error returned by MVCCGet.
if found && msg != nil {
// If the unmarshal failed, return its result. Otherwise, pass
// through the underlying error (which may be a WriteIntentError
// to be handled specially alongside the returned value).
if err := value.GetProto(msg); err != nil {
return found, err
}
}
return found, mvccGetErr
}
// MVCCPutProto sets the given key to the protobuf-serialized byte
// string of msg and the provided timestamp.
func MVCCPutProto(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
txn *roachpb.Transaction,
msg proto.Message,
) error {
value := roachpb.Value{}
if err := value.SetProto(msg); err != nil {
return err
}
value.InitChecksum(key)
return MVCCPut(ctx, engine, ms, key, timestamp, value, txn)
}
type getBuffer struct {
meta enginepb.MVCCMetadata
value roachpb.Value
allowUnsafeValue bool
isUnsafeValue bool
}
var getBufferPool = sync.Pool{
New: func() interface{} {
return &getBuffer{}
},
}
func newGetBuffer() *getBuffer {
buf := getBufferPool.Get().(*getBuffer)
buf.allowUnsafeValue = false
buf.isUnsafeValue = false
return buf
}
func (b *getBuffer) release() {
*b = getBuffer{}
getBufferPool.Put(b)
}
// MVCCGet returns the value for the key specified in the request,
// while satisfying the given timestamp condition. The key may contain
// arbitrary bytes. If no value for the key exists, or it has been
// deleted, returns nil for value.
//
// The values of multiple versions for the given key should
// be organized as follows:
// ...
// keyA : MVCCMetadata of keyA
// keyA_Timestamp_n : value of version_n
// keyA_Timestamp_n-1 : value of version_n-1
// ...
// keyA_Timestamp_0 : value of version_0
// keyB : MVCCMetadata of keyB
// ...
//
// The consistent parameter indicates that intents should cause
// WriteIntentErrors. If set to false, a possible intent on the key will be
// ignored for reading the value (but returned via the roachpb.Intent slice);
// the previous value (if any) is read instead.
func MVCCGet(
ctx context.Context,
engine Reader,
key roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) (*roachpb.Value, []roachpb.Intent, error) {
iter := engine.NewIterator(true)
defer iter.Close()
return mvccGetUsingIter(ctx, iter, key, timestamp, consistent, txn)
}
func mvccGetUsingIter(
ctx context.Context,
iter Iterator,
key roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) (*roachpb.Value, []roachpb.Intent, error) {
if len(key) == 0 {
return nil, nil, emptyKeyError()
}
buf := newGetBuffer()
defer buf.release()
metaKey := MakeMVCCMetadataKey(key)
ok, _, _, err := mvccGetMetadata(iter, metaKey, &buf.meta)
if !ok || err != nil {
return nil, nil, err
}
value, intents, _, err := mvccGetInternal(ctx, iter, metaKey,
timestamp, consistent, safeValue, txn, buf)
if value == &buf.value {
value = &roachpb.Value{}
*value = buf.value
buf.value.Reset()
}
return value, intents, err
}
// MVCCGetAsTxn constructs a temporary Transaction from the given txn
// metadata and calls MVCCGet as that transaction. This method is required
// only for reading intents of a transaction when only its metadata is known
// and should rarely be used.
// The read is carried out without the chance of uncertainty restarts.
func MVCCGetAsTxn(
ctx context.Context,
engine Reader,
key roachpb.Key,
timestamp hlc.Timestamp,
txnMeta enginepb.TxnMeta,
) (*roachpb.Value, []roachpb.Intent, error) {
txn := &roachpb.Transaction{
TxnMeta: txnMeta,
Status: roachpb.PENDING,
Writing: true,
OrigTimestamp: txnMeta.Timestamp,
MaxTimestamp: txnMeta.Timestamp,
}
return MVCCGet(ctx, engine, key, timestamp, true /* consistent */, txn)
}
// mvccGetMetadata returns or reconstructs the meta key for the given key.
// A prefix scan using the iterator is performed, resulting in one of the
// following successful outcomes:
// 1) iterator finds nothing; returns (false, 0, 0, nil).
// 2) iterator finds an explicit meta key; unmarshals and returns its size.
// 3) iterator finds a value, i.e. the meta key is implicit.
// In this case, it accounts for the size of the key with the portion
// of the user key found which is not the MVCC timestamp suffix (since
// that is the usual contribution of the meta key). The value size returned
// will be zero.
// The passed in MVCCMetadata must not be nil.
//
// If the supplied iterator is nil, no seek operation is performed. This is
// used by the Blind{Put,ConditionalPut} operations to avoid seeking when the
// metadata is known not to exist.
func mvccGetMetadata(
iter Iterator, metaKey MVCCKey, meta *enginepb.MVCCMetadata,
) (ok bool, keyBytes, valBytes int64, err error) {
if iter == nil {
return false, 0, 0, nil
}
iter.Seek(metaKey)
if ok, err := iter.Valid(); !ok {
return false, 0, 0, err
}
unsafeKey := iter.UnsafeKey()
if !unsafeKey.Key.Equal(metaKey.Key) {
return false, 0, 0, nil
}
if !unsafeKey.IsValue() {
if err := iter.ValueProto(meta); err != nil {
return false, 0, 0, err
}
return true, int64(unsafeKey.EncodedSize()), int64(len(iter.UnsafeValue())), nil
}
meta.Reset()
// For values, the size of keys is always accounted for as
// mvccVersionTimestampSize. The size of the metadata key is
// accounted for separately.
meta.KeyBytes = mvccVersionTimestampSize
meta.ValBytes = int64(len(iter.UnsafeValue()))
meta.Deleted = len(iter.UnsafeValue()) == 0
meta.Timestamp = unsafeKey.Timestamp
return true, int64(unsafeKey.EncodedSize()) - meta.KeyBytes, 0, nil
}
type valueSafety int
const (
unsafeValue valueSafety = iota
safeValue
)
// mvccGetInternal parses the MVCCMetadata from the specified raw key
// value, and reads the versioned value indicated by timestamp, taking
// the transaction txn into account. getValue is a helper function to
// get an earlier version of the value when doing historical reads.
//
// The consistent parameter specifies whether reads should ignore any write
// intents (regardless of the actual status of their transaction) and read the
// most recent non-intent value instead. In the event that an inconsistent read
// does encounter an intent (currently there can only be one), it is returned
// via the roachpb.Intent slice, in addition to the result.
func mvccGetInternal(
_ context.Context,
iter Iterator,
metaKey MVCCKey,
timestamp hlc.Timestamp,
consistent bool,
allowedSafety valueSafety,
txn *roachpb.Transaction,
buf *getBuffer,
) (*roachpb.Value, []roachpb.Intent, valueSafety, error) {
if !consistent && txn != nil {
return nil, nil, safeValue, errors.Errorf(
"cannot allow inconsistent reads within a transaction")
}
meta := &buf.meta
// If value is inline, return immediately; txn & timestamp are irrelevant.
if meta.IsInline() {
value := &buf.value
*value = roachpb.Value{RawBytes: meta.RawBytes}
if err := value.Verify(metaKey.Key); err != nil {
return nil, nil, safeValue, err
}
return value, nil, safeValue, nil
}
var ignoredIntents []roachpb.Intent
if !consistent && meta.Txn != nil && !timestamp.Less(meta.Timestamp) {
// If we're doing inconsistent reads and there's an intent, we
// ignore the intent by insisting that the timestamp we're reading
// at is a historical timestamp < the intent timestamp. However, we
// return the intent separately; the caller may want to resolve it.
ignoredIntents = append(ignoredIntents,
roachpb.Intent{Span: roachpb.Span{Key: metaKey.Key}, Status: roachpb.PENDING, Txn: *meta.Txn})
timestamp = meta.Timestamp.Prev()
}
ownIntent := IsIntentOf(*meta, txn) // false if txn == nil
if !timestamp.Less(meta.Timestamp) && meta.Txn != nil && !ownIntent {
// Trying to read the last value, but it's another transaction's intent;
// the reader will have to act on this.
return nil, nil, safeValue, &roachpb.WriteIntentError{
Intents: []roachpb.Intent{{Span: roachpb.Span{Key: metaKey.Key}, Status: roachpb.PENDING, Txn: *meta.Txn}},
}
}
var checkValueTimestamp bool
seekKey := metaKey
if !timestamp.Less(meta.Timestamp) || ownIntent {
// We are reading the latest value, which is either an intent written
// by this transaction or not an intent at all (so there's no
// conflict). Note that when reading the own intent, the timestamp
// specified is irrelevant; we always want to see the intent (see
// TestMVCCReadWithPushedTimestamp).
seekKey.Timestamp = meta.Timestamp
// Check for case where we're reading our own txn's intent
// but it's got a different epoch. This can happen if the
// txn was restarted and an earlier iteration wrote the value
// we're now reading. In this case, we skip the intent.
if ownIntent && txn.Epoch != meta.Txn.Epoch {
if txn.Epoch < meta.Txn.Epoch {
return nil, nil, safeValue, errors.Errorf(
"failed to read with epoch %d due to a write intent with epoch %d",
txn.Epoch, meta.Txn.Epoch)
}
seekKey = seekKey.Next()
}
} else if txn != nil && timestamp.Less(txn.MaxTimestamp) {
// In this branch, the latest timestamp is ahead, and so the read of an
// "old" value in a transactional context at time (timestamp, MaxTimestamp]
// occurs, leading to a clock uncertainty error if a version exists in
// that time interval.
if !txn.MaxTimestamp.Less(meta.Timestamp) {
// Second case: Our read timestamp is behind the latest write, but the
// latest write could possibly have happened before our read in
// absolute time if the writer had a fast clock.
// The reader should try again with a later timestamp than the
// one given below.
return nil, nil, safeValue, roachpb.NewReadWithinUncertaintyIntervalError(
timestamp, meta.Timestamp)
}
// We want to know if anything has been written ahead of timestamp, but
// before MaxTimestamp.
seekKey.Timestamp = txn.MaxTimestamp
checkValueTimestamp = true
} else {
// Third case: We're reading a historic value either outside of a
// transaction, or in the absence of future versions that clock uncertainty
// would apply to.
seekKey.Timestamp = timestamp
if seekKey.Timestamp == (hlc.Timestamp{}) {
return nil, ignoredIntents, safeValue, nil
}
}
iter.Seek(seekKey)
if ok, err := iter.Valid(); err != nil {
return nil, nil, safeValue, err
} else if !ok {
return nil, ignoredIntents, safeValue, nil
}
unsafeKey := iter.UnsafeKey()
if !unsafeKey.Key.Equal(metaKey.Key) {
return nil, ignoredIntents, safeValue, nil
}
if !unsafeKey.IsValue() {
return nil, nil, safeValue, errors.Errorf(
"expected scan to versioned value reading key %s; got %s %s",
metaKey.Key, unsafeKey, unsafeKey.Timestamp)
}
if checkValueTimestamp {
if timestamp.Less(unsafeKey.Timestamp) {
// Fourth case: Our read timestamp is sufficiently behind the newest
// value, but there is another previous write with the same issues as in
// the second case, so the reader will have to come again with a higher
// read timestamp.
return nil, nil, safeValue, roachpb.NewReadWithinUncertaintyIntervalError(
timestamp, unsafeKey.Timestamp)
}
// Fifth case: There's no value in our future up to MaxTimestamp, and those
// are the only ones that we're not certain about. The correct key has
// already been read above, so there's nothing left to do.
}
if len(iter.UnsafeValue()) == 0 {
// Value is deleted.
return nil, ignoredIntents, safeValue, nil
}
value := &buf.value
if allowedSafety == unsafeValue {
value.RawBytes = iter.UnsafeValue()
} else {
value.RawBytes = iter.Value()
}
value.Timestamp = unsafeKey.Timestamp
if err := value.Verify(metaKey.Key); err != nil {
return nil, nil, safeValue, err
}
return value, ignoredIntents, allowedSafety, nil
}
// putBuffer holds pointer data needed by mvccPutInternal. Bundling
// this data into a single structure reduces memory
// allocations. Managing this temporary buffer using a sync.Pool
// completely eliminates allocation from the put common path.
type putBuffer struct {
meta enginepb.MVCCMetadata
newMeta enginepb.MVCCMetadata
newTxn enginepb.TxnMeta
ts hlc.Timestamp
tmpbuf []byte
}
var putBufferPool = sync.Pool{
New: func() interface{} {
return &putBuffer{}
},
}
func newPutBuffer() *putBuffer {
return putBufferPool.Get().(*putBuffer)
}
func (b *putBuffer) release() {
*b = putBuffer{tmpbuf: b.tmpbuf[:0]}
putBufferPool.Put(b)
}
func (b *putBuffer) marshalMeta(meta *enginepb.MVCCMetadata) (_ []byte, err error) {
size := meta.Size()
data := b.tmpbuf
if cap(data) < size {
data = make([]byte, size)
} else {
data = data[:size]
}
protoutil.Interceptor(meta)
n, err := meta.MarshalTo(data)
if err != nil {
return nil, err
}
b.tmpbuf = data
return data[:n], nil
}
func (b *putBuffer) putMeta(
engine Writer, key MVCCKey, meta *enginepb.MVCCMetadata,
) (keyBytes, valBytes int64, err error) {
bytes, err := b.marshalMeta(meta)
if err != nil {
return 0, 0, err
}
if err := engine.Put(key, bytes); err != nil {
return 0, 0, err
}
return int64(key.EncodedSize()), int64(len(bytes)), nil
}
// MVCCPut sets the value for a specified key. It will save the value
// with different versions according to its timestamp and update the
// key metadata. The timestamp must be passed as a parameter; using
// the Timestamp field on the value results in an error.
//
// If the timestamp is specified as hlc.Timestamp{}, the value is
// inlined instead of being written as a timestamp-versioned value. A
// zero timestamp write to a key precludes a subsequent write using a
// non-zero timestamp and vice versa. Inlined values require only a
// single row and never accumulate more than a single value. Successive
// zero timestamp writes to a key replace the value and deletes clear
// the value. In addition, zero timestamp values may be merged.
func MVCCPut(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
txn *roachpb.Transaction,
) error {
// If we're not tracking stats for the key and we're writing a non-versioned
// key we can utilize a blind put to avoid reading any existing value.
var iter Iterator
blind := ms == nil && timestamp == (hlc.Timestamp{})
if !blind {
iter = engine.NewIterator(true)
defer iter.Close()
}
return mvccPutUsingIter(ctx, engine, iter, ms, key, timestamp, value, txn, nil /* valueFn */)
}
// MVCCBlindPut is a fast-path of MVCCPut. See the MVCCPut comments for details
// of the semantics. MVCCBlindPut skips retrieving the existing metadata for
// the key requiring the caller to guarantee no versions for the key currently
// exist in order for stats to be updated properly. If a previous version of
// the key does exist it is up to the caller to properly account for their
// existence in updating the stats.
func MVCCBlindPut(
ctx context.Context,
engine Writer,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
txn *roachpb.Transaction,
) error {
return mvccPutUsingIter(ctx, engine, nil, ms, key, timestamp, value, txn, nil /* valueFn */)
}
// MVCCDelete marks the key deleted so that it will not be returned in
// future get responses.
func MVCCDelete(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
txn *roachpb.Transaction,
) error {
iter := engine.NewIterator(true)
defer iter.Close()
return mvccPutUsingIter(ctx, engine, iter, ms, key, timestamp, noValue, txn, nil /* valueFn */)
}
var noValue = roachpb.Value{}
// mvccPutUsingIter sets the value for a specified key using the provided
// Iterator. The function takes a value and a valueFn, only one of which
// should be provided. If the valueFn is nil, value's raw bytes will be set
// for the key, else the bytes provided by the valueFn will be used.
func mvccPutUsingIter(
ctx context.Context,
engine Writer,
iter Iterator,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
txn *roachpb.Transaction,
valueFn func(*roachpb.Value) ([]byte, error),
) error {
var rawBytes []byte
if valueFn == nil {
if value.Timestamp != (hlc.Timestamp{}) {
return errors.Errorf("cannot have timestamp set in value on Put")
}
rawBytes = value.RawBytes
}
buf := newPutBuffer()
err := mvccPutInternal(ctx, engine, iter, ms, key, timestamp, rawBytes,
txn, buf, valueFn)
// Using defer would be more convenient, but it is measurably slower.
buf.release()
return err
}
// maybeGetValue returns either value (if valueFn is nil) or else
// the result of calling valueFn on the data read at readTS.
func maybeGetValue(
ctx context.Context,
iter Iterator,
metaKey MVCCKey,
value []byte,
exists bool,
readTS hlc.Timestamp,
txn *roachpb.Transaction,
buf *putBuffer,
valueFn func(*roachpb.Value) ([]byte, error),
) ([]byte, error) {
// If a valueFn is specified, read existing value using the iter.
if valueFn == nil {
return value, nil
}
var exVal *roachpb.Value
if exists {
getBuf := newGetBuffer()
defer getBuf.release()
getBuf.meta = buf.meta // initialize get metadata from what we've already read
var err error
if exVal, _, _, err = mvccGetInternal(
ctx, iter, metaKey, readTS, true /* consistent */, safeValue, txn, getBuf); err != nil {
return nil, err
}
}
return valueFn(exVal)
}
// mvccPutInternal adds a new timestamped value to the specified key.
// If value is nil, creates a deletion tombstone value. valueFn is
// an optional alternative to supplying value directly. It is passed
// the existing value (or nil if none exists) and returns the value
// to write or an error. If valueFn is supplied, value should be nil
// and vice versa. valueFn can delete by returning nil. Returning
// []byte{} will write an empty value, not delete.
func mvccPutInternal(
ctx context.Context,
engine Writer,
iter Iterator,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value []byte,
txn *roachpb.Transaction,
buf *putBuffer,
valueFn func(*roachpb.Value) ([]byte, error),
) error {
if len(key) == 0 {
return emptyKeyError()
}
metaKey := MakeMVCCMetadataKey(key)
ok, origMetaKeySize, origMetaValSize, err := mvccGetMetadata(iter, metaKey, &buf.meta)
if err != nil {
return err
}
// Verify we're not mixing inline and non-inline values.
putIsInline := timestamp == (hlc.Timestamp{})
if ok && putIsInline != buf.meta.IsInline() {
return errors.Errorf("%q: put is inline=%t, but existing value is inline=%t",
metaKey, putIsInline, buf.meta.IsInline())
}
// Handle inline put.
if putIsInline {
if txn != nil {
return errors.Errorf("%q: inline writes not allowed within transactions", metaKey)
}
var metaKeySize, metaValSize int64
if value, err = maybeGetValue(
ctx, iter, metaKey, value, ok, timestamp, txn, buf, valueFn); err != nil {
return err
}
if value == nil {
metaKeySize, metaValSize, err = 0, 0, engine.Clear(metaKey)
} else {
buf.meta = enginepb.MVCCMetadata{RawBytes: value}
metaKeySize, metaValSize, err = buf.putMeta(engine, metaKey, &buf.meta)
}
if ms != nil {
updateStatsForInline(ms, key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize)
}
return err
}
var meta *enginepb.MVCCMetadata
var maybeTooOldErr error
if ok {
// There is existing metadata for this key; ensure our write is permitted.
meta = &buf.meta
if meta.Txn != nil {
// There is an uncommitted write intent.
if txn == nil || !roachpb.TxnIDEqual(meta.Txn.ID, txn.ID) {
// The current Put operation does not come from the same
// transaction.
return &roachpb.WriteIntentError{Intents: []roachpb.Intent{{Span: roachpb.Span{Key: key}, Status: roachpb.PENDING, Txn: *meta.Txn}}}
} else if txn.Epoch < meta.Txn.Epoch {
return errors.Errorf("put with epoch %d came after put with epoch %d in txn %s",
txn.Epoch, meta.Txn.Epoch, txn.ID)
} else if txn.Epoch == meta.Txn.Epoch &&
(txn.Sequence < meta.Txn.Sequence ||
(txn.Sequence == meta.Txn.Sequence && txn.BatchIndex <= meta.Txn.BatchIndex)) {
// Replay error if we encounter an older sequence number or
// the same (or earlier) batch index for the same sequence.
return roachpb.NewTransactionRetryError(roachpb.RETRY_POSSIBLE_REPLAY)
}
// Make sure we process valueFn before clearing any earlier
// version. For example, a conditional put within same
// transaction should read previous write.
if value, err = maybeGetValue(
ctx, iter, metaKey, value, ok, timestamp, txn, buf, valueFn); err != nil {
return err
}
// We are replacing our own older write intent. If we are
// writing at the same timestamp we can simply overwrite it;
// otherwise we must explicitly delete the obsolete intent.
if timestamp != meta.Timestamp {
versionKey := metaKey
versionKey.Timestamp = meta.Timestamp
if err = engine.Clear(versionKey); err != nil {
return err
}
}
} else if !meta.Timestamp.Less(timestamp) {
// This is the case where we're trying to write under a
// committed value. Obviously we can't do that, but we can
// increment our timestamp to one logical tick past the existing
// value and go on to write, but then return a write-too-old
// error indicating what the timestamp ended up being. This
// timestamp can then be used to increment the txn timestamp and
// be returned with the response.
actualTimestamp := meta.Timestamp.Next()
maybeTooOldErr = &roachpb.WriteTooOldError{Timestamp: timestamp, ActualTimestamp: actualTimestamp}
// If we're in a transaction, always get the value at the orig
// timestamp.
if txn != nil {
if value, err = maybeGetValue(
ctx, iter, metaKey, value, ok, timestamp, txn, buf, valueFn); err != nil {
return err
}
} else {
// Outside of a transaction, read the latest value and advance
// the write timestamp to the latest value's timestamp + 1. The
// new timestamp is returned to the caller in maybeTooOldErr.
if value, err = maybeGetValue(
ctx, iter, metaKey, value, ok, actualTimestamp, txn, buf, valueFn); err != nil {
return err
}
}
timestamp = actualTimestamp
} else {
if value, err = maybeGetValue(
ctx, iter, metaKey, value, ok, timestamp, txn, buf, valueFn); err != nil {
return err
}
}
} else {
// There is no existing value for this key. Even if the new value is
// nil write a deletion tombstone for the key.
if value, err = maybeGetValue(
ctx, iter, metaKey, value, ok, timestamp, txn, buf, valueFn); err != nil {
return err
}
}
{
var txnMeta *enginepb.TxnMeta
if txn != nil {
txnMeta = &txn.TxnMeta
}
buf.newMeta = enginepb.MVCCMetadata{Txn: txnMeta, Timestamp: timestamp}
}
newMeta := &buf.newMeta
versionKey := metaKey
versionKey.Timestamp = timestamp
if err := engine.Put(versionKey, value); err != nil {
return err
}
// Write the mvcc metadata now that we have sizes for the latest
// versioned value. For values, the size of keys is always accounted
// for as mvccVersionTimestampSize. The size of the metadata key is
// accounted for separately.
newMeta.KeyBytes = mvccVersionTimestampSize
newMeta.ValBytes = int64(len(value))
newMeta.Deleted = value == nil
var metaKeySize, metaValSize int64
if newMeta.Txn != nil {
metaKeySize, metaValSize, err = buf.putMeta(engine, metaKey, newMeta)
if err != nil {
return err
}
} else {
// Per-key stats count the full-key once and mvccVersionTimestampSize for
// each versioned value. We maintain that accounting even when the MVCC
// metadata is implicit.
metaKeySize = int64(metaKey.EncodedSize())
}
// Update MVCC stats.
if ms != nil {
ms.Add(updateStatsOnPut(key, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize, meta, newMeta))
}
return maybeTooOldErr
}
// MVCCIncrement fetches the value for key, and assuming the value is
// an "integer" type, increments it by inc and stores the new
// value. The newly incremented value is returned.
//
// An initial value is read from the key using the same operational
// timestamp as we use to write a value.
func MVCCIncrement(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
txn *roachpb.Transaction,
inc int64,
) (int64, error) {
iter := engine.NewIterator(true)
defer iter.Close()
var int64Val int64
err := mvccPutUsingIter(ctx, engine, iter, ms, key, timestamp, noValue, txn, func(value *roachpb.Value) ([]byte, error) {
if value != nil {
var err error
if int64Val, err = value.GetInt(); err != nil {
return nil, errors.Errorf("key %q does not contain an integer value", key)
}
}
// Check for overflow and underflow.
if willOverflow(int64Val, inc) {
return nil, errors.Errorf("key %s with value %d incremented by %d results in overflow", key, int64Val, inc)
}
int64Val = int64Val + inc
newValue := roachpb.Value{}
newValue.SetInt(int64Val)
newValue.InitChecksum(key)
return newValue.RawBytes, nil
})
return int64Val, err
}
// MVCCConditionalPut sets the value for a specified key only if the
// expected value matches. If not, the return a ConditionFailedError
// containing the actual value.
//
// The condition check reads a value from the key using the same operational
// timestamp as we use to write a value.
func MVCCConditionalPut(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
expVal *roachpb.Value,
txn *roachpb.Transaction,
) error {
iter := engine.NewIterator(true)
defer iter.Close()
return mvccConditionalPutUsingIter(ctx, engine, iter, ms, key, timestamp, value, expVal, txn)
}
// MVCCBlindConditionalPut is a fast-path of MVCCConditionalPut. See the
// MVCCConditionalPut comments for details of the
// semantics. MVCCBlindConditionalPut skips retrieving the existing metadata
// for the key requiring the caller to guarantee no versions for the key
// currently exist.
func MVCCBlindConditionalPut(
ctx context.Context,
engine Writer,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
expVal *roachpb.Value,
txn *roachpb.Transaction,
) error {
return mvccConditionalPutUsingIter(ctx, engine, nil, ms, key, timestamp, value, expVal, txn)
}
func mvccConditionalPutUsingIter(
ctx context.Context,
engine Writer,
iter Iterator,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
expVal *roachpb.Value,
txn *roachpb.Transaction,
) error {
return mvccPutUsingIter(
ctx, engine, iter, ms, key, timestamp, noValue, txn,
func(existVal *roachpb.Value) ([]byte, error) {
if expValPresent, existValPresent := expVal != nil, existVal != nil; expValPresent && existValPresent {
// Every type flows through here, so we can't use the typed getters.
if !bytes.Equal(expVal.RawBytes, existVal.RawBytes) {
return nil, &roachpb.ConditionFailedError{
ActualValue: existVal.ShallowClone(),
}
}
} else if expValPresent != existValPresent {
return nil, &roachpb.ConditionFailedError{
ActualValue: existVal.ShallowClone(),
}
}
return value.RawBytes, nil
})
}
var errInitPutValueMatchesExisting = errors.New("the value matched the existing value")
// MVCCInitPut sets the value for a specified key if the key doesn't exist. It
// returns an error when the write fails or if the key exists with an
// existing value that is different from the supplied value.
func MVCCInitPut(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
txn *roachpb.Transaction,
) error {
iter := engine.NewIterator(true)
defer iter.Close()
return mvccInitPutUsingIter(ctx, engine, iter, ms, key, timestamp, value, txn)
}
// MVCCBlindInitPut is a fast-path of MVCCInitPut. See the MVCCInitPut
// comments for details of the semantics. MVCCBlindInitPut skips
// retrieving the existing metadata for the key requiring the caller
// to gauarntee no version for the key currently exist.
func MVCCBlindInitPut(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
txn *roachpb.Transaction,
) error {
return mvccInitPutUsingIter(ctx, engine, nil, ms, key, timestamp, value, txn)
}
func mvccInitPutUsingIter(
ctx context.Context,
engine ReadWriter,
iter Iterator,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
txn *roachpb.Transaction,
) error {
err := mvccPutUsingIter(ctx, engine, iter, ms, key, timestamp, noValue, txn,
func(existVal *roachpb.Value) ([]byte, error) {
if existVal != nil {
if !bytes.Equal(value.RawBytes, existVal.RawBytes) {
return nil, &roachpb.ConditionFailedError{
ActualValue: existVal.ShallowClone(),
}
}
// The existing value matches the supplied value; return an error
// to prevent rewriting the value.
return nil, errInitPutValueMatchesExisting
}
return value.RawBytes, nil
},
)
// Dummy error to prevent an unnecessary write.
if err == errInitPutValueMatchesExisting {
err = nil
}
return err
}
// MVCCMerge implements a merge operation. Merge adds integer values,
// concatenates undifferentiated byte slice values, and efficiently
// combines time series observations if the roachpb.Value tag value
// indicates the value byte slice is of type TIMESERIES.
func MVCCMerge(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key roachpb.Key,
timestamp hlc.Timestamp,
value roachpb.Value,
) error {
if len(key) == 0 {
return emptyKeyError()
}
metaKey := MakeMVCCMetadataKey(key)
buf := newPutBuffer()
// Every type flows through here, so we can't use the typed getters.
rawBytes := value.RawBytes
// Encode and merge the MVCC metadata with inlined value.
meta := &buf.meta
*meta = enginepb.MVCCMetadata{RawBytes: rawBytes}
// If non-zero, set the merge timestamp to provide some replay protection.
if timestamp != (hlc.Timestamp{}) {
buf.ts = timestamp
meta.MergeTimestamp = &buf.ts
}
data, err := buf.marshalMeta(meta)
if err == nil {
if err = engine.Merge(metaKey, data); err == nil && ms != nil {
ms.Add(updateStatsOnMerge(
key, int64(len(rawBytes))+mvccVersionTimestampSize, timestamp.WallTime))
}
}
buf.release()
return err
}
// MVCCDeleteRange deletes the range of key/value pairs specified by start and
// end keys. It returns the range of keys deleted when returnedKeys is set,
// the next span to resume from, and the number of keys deleted.
func MVCCDeleteRange(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
key,
endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
txn *roachpb.Transaction,
returnKeys bool,
) ([]roachpb.Key, *roachpb.Span, int64, error) {
if max == 0 {
return nil, &roachpb.Span{Key: key, EndKey: endKey}, 0, nil
}
var keys []roachpb.Key
var resumeSpan *roachpb.Span
var num int64
buf := newPutBuffer()
iter := engine.NewIterator(true)
f := func(kv roachpb.KeyValue) (bool, error) {
if num == max {
// Another key was found beyond the max limit.
resumeSpan = &roachpb.Span{Key: kv.Key, EndKey: endKey}
return true, nil
}
if err := mvccPutInternal(ctx, engine, iter, ms, kv.Key, timestamp, nil, txn, buf, nil); err != nil {
return true, err
}
if returnKeys {
keys = append(keys, kv.Key)
}
num++
return false, nil
}
// In order to detect the potential write intent by another
// concurrent transaction with a newer timestamp, we need
// to use the max timestamp for scan.
_, err := MVCCIterate(ctx, engine, key, endKey, hlc.MaxTimestamp, true, txn, false, f)
iter.Close()
buf.release()
return keys, resumeSpan, num, err
}
// getScanMeta returns the MVCCMetadata the iterator is currently pointed at
// (reconstructing it if the metadata is implicit). Note that the returned
// MVCCKey is unsafe and will be invalidated by the next call to
// Iterator.{Next,Prev,Seek,SeekReverse,Close}.
func getScanMeta(iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata) (MVCCKey, error) {
metaKey := iter.UnsafeKey()
if !metaKey.Less(encEndKey) {
_, err := iter.Valid()
return NilKey, err
}
if metaKey.IsValue() {
meta.Reset()
meta.Timestamp = metaKey.Timestamp
// For values, the size of keys is always account for as
// mvccVersionTimestampSize. The size of the metadata key is accounted for
// separately.
meta.KeyBytes = mvccVersionTimestampSize
meta.ValBytes = int64(len(iter.UnsafeValue()))
meta.Deleted = len(iter.UnsafeValue()) == 0
return metaKey, nil
}
if err := iter.ValueProto(meta); err != nil {
return NilKey, err
}
return metaKey, nil
}
// getReverseScanMeta returns the MVCCMetadata the iterator is currently
// pointed at (reconstructing it if the metadata is implicit). Note that the
// returned MVCCKey is unsafe and will be invalidated by the next call to
// Iterator.{Next,Prev,Seek,SeekReverse,Close}.
func getReverseScanMeta(
iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata,
) (MVCCKey, error) {
metaKey := iter.UnsafeKey()
// The metaKey < encEndKey is exceeding the boundary.
if metaKey.Less(encEndKey) {
_, err := iter.Valid()
return NilKey, err
}
// If this isn't the meta key yet, scan again to get the meta key.
// TODO(tschottdorf): can we save any work here or leverage
// getScanMetaKey() above after doing the Seek() below?
if metaKey.IsValue() {
// Need a "safe" key because we're seeking the iterator.
metaKey = iter.Key()
// The row with oldest version will be got by seeking reversely. We use the
// key of this row to get the MVCC metadata key.
iter.Seek(MakeMVCCMetadataKey(metaKey.Key))
if ok, err := iter.Valid(); !ok {
return NilKey, err
}
meta.Reset()
metaKey = iter.UnsafeKey()
meta.Timestamp = metaKey.Timestamp
if metaKey.IsValue() {
// For values, the size of keys is always account for as
// mvccVersionTimestampSize. The size of the metadata key is accounted
// for separately.
meta.KeyBytes = mvccVersionTimestampSize
meta.ValBytes = int64(len(iter.UnsafeValue()))
meta.Deleted = len(iter.UnsafeValue()) == 0
return metaKey, nil
}
}
if err := iter.ValueProto(meta); err != nil {
return NilKey, err
}
return metaKey, nil
}
// mvccScanInternal scans the key range [start,end) up to some maximum number
// of results. Specify reverse=true to scan in descending instead of ascending
// order.
func mvccScanInternal(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
reverse bool,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
var res []roachpb.KeyValue
if max == 0 {
return nil, &roachpb.Span{Key: key, EndKey: endKey}, nil, nil
}
var resumeSpan *roachpb.Span
intents, err := MVCCIterate(ctx, engine, key, endKey, timestamp, consistent, txn, reverse,
func(kv roachpb.KeyValue) (bool, error) {
if int64(len(res)) == max {
// Another key was found beyond the max limit.
if reverse {
resumeSpan = &roachpb.Span{Key: key, EndKey: kv.Key.Next()}
} else {
resumeSpan = &roachpb.Span{Key: kv.Key, EndKey: endKey}
}
return true, nil
}
res = append(res, kv)
return false, nil
})
if err != nil {
return nil, nil, nil, err
}
return res, resumeSpan, intents, nil
}
// MVCCScan scans the key range [start,end) key up to some maximum number of
// results in ascending order. If it hits max, it returns a span to be used in
// the next call to this function.
func MVCCScan(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
return mvccScanInternal(ctx, engine, key, endKey, max, timestamp,
consistent, txn, false /* !reverse */)
}
// MVCCReverseScan scans the key range [start,end) key up to some maximum
// number of results in descending order. If it hits max, it returns a span to
// be used in the next call to this function.
func MVCCReverseScan(
ctx context.Context,
engine Reader,
key,
endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
return mvccScanInternal(ctx, engine, key, endKey, max, timestamp,
consistent, txn, true /* reverse */)
}
// MVCCIterate iterates over the key range [start,end). At each step of the
// iteration, f() is invoked with the current key/value pair. If f returns
// true (done) or an error, the iteration stops and the error is propagated.
// If the reverse is flag set the iterator will be moved in reverse order.
func MVCCIterate(
ctx context.Context,
engine Reader,
startKey,
endKey roachpb.Key,
timestamp hlc.Timestamp,
consistent bool,
txn *roachpb.Transaction,
reverse bool,
f func(roachpb.KeyValue) (bool, error),
) ([]roachpb.Intent, error) {
if !consistent && txn != nil {
return nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(endKey) == 0 {
return nil, emptyKeyError()
}
buf := newGetBuffer()
defer buf.release()
// getMetaFunc is used to get the meta and the meta key of the current
// row. encEndKey is used to judge whether iterator exceeds the boundary or
// not.
type getMetaFunc func(iter Iterator, encEndKey MVCCKey, meta *enginepb.MVCCMetadata) (MVCCKey, error)
var getMeta getMetaFunc
// We store encEndKey and encKey in the same buffer to avoid memory
// allocations.
var encKey, encEndKey MVCCKey
if reverse {
encEndKey = MakeMVCCMetadataKey(startKey)
encKey = MakeMVCCMetadataKey(endKey)
getMeta = getReverseScanMeta
} else {
encEndKey = MakeMVCCMetadataKey(endKey)
encKey = MakeMVCCMetadataKey(startKey)
getMeta = getScanMeta
}
// Get a new iterator.
iter := engine.NewIterator(false)
defer iter.Close()
// Seeking for the first defined position.
if reverse {
iter.SeekReverse(encKey)
if ok, err := iter.Valid(); !ok {
return nil, err
}
// If the key doesn't exist, the iterator is at the next key that does
// exist in the database.
metaKey := iter.Key()
if !metaKey.Less(encKey) {
iter.Prev()
}
} else {
iter.Seek(encKey)
}
if ok, err := iter.Valid(); !ok {
return nil, err
}
// A slice to gather all encountered intents we skipped, in case of
// inconsistent iteration.
var intents []roachpb.Intent
// Gathers up all the intents from WriteIntentErrors. We only get those if
// the scan is consistent.
var wiErr error
var alloc bufalloc.ByteAllocator
for {
metaKey, err := getMeta(iter, encEndKey, &buf.meta)
if err != nil {
return nil, err
}
// Exceeding the boundary.
if metaKey.Key == nil {
break
}
alloc, metaKey.Key = alloc.Copy(metaKey.Key, 1)
// Indicate that we're fine with an unsafe Value.RawBytes being returned.
value, newIntents, valueSafety, err := mvccGetInternal(
ctx, iter, metaKey, timestamp, consistent, unsafeValue, txn, buf)
intents = append(intents, newIntents...)
if value != nil {
if valueSafety == unsafeValue {
// Copy the unsafe value into our allocation buffer.
alloc, value.RawBytes = alloc.Copy(value.RawBytes, 0)
}
done, err := f(roachpb.KeyValue{Key: metaKey.Key, Value: *value})
if err != nil {
return nil, err
}
if done {
break
}
}
if err != nil {
switch tErr := err.(type) {
case *roachpb.WriteIntentError:
// In the case of WriteIntentErrors, accumulate affected keys but continue scan.
if wiErr == nil {
wiErr = tErr
} else {
wiErr.(*roachpb.WriteIntentError).Intents = append(wiErr.(*roachpb.WriteIntentError).Intents, tErr.Intents...)
}
default:
return nil, err
}
}
if reverse {
valid, err := iter.Valid()
if err != nil {
return nil, err
}
if buf.meta.IsInline() {
if valid {
// The current entry is an inline value. We can reach the previous
// entry using Prev() which is slightly faster than PrevKey().
//
// As usual, the iterator must be valid because an inline key should
// never result in a version scan that brings us to an invalid key.
iter.Prev()
}
} else {
// This is subtle: mvccGetInternal might already have advanced
// us to the next key in which case we have to reset our
// position. We also Seek when iter.Valid says that the iterator
// is invalid, because mvccGetInternal might have advanced us
// out of the valid range and we may even have reached KeyMax.
// In this case, we still want to continue scanning backwards.
if !valid || !iter.UnsafeKey().Key.Equal(metaKey.Key) {
iter.Seek(metaKey)
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if ok {
iter.Prev()
}
} else {
iter.PrevKey()
}
}
} else {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if ok {
if buf.meta.IsInline() {
// The current entry is an inline value. We can reach the next entry
// using Next() which is slightly faster than NextKey().
iter.Next()
} else {
// This is subtle: mvccGetInternal might already have advanced us to
// the next key in which case we don't have to do anything. Only call
// NextKey() if the current key pointed to by the iterator is the same
// as the one at the top of the loop.
if iter.UnsafeKey().Key.Equal(metaKey.Key) {
iter.NextKey()
}
}
}
}
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok {
break
}
}
return intents, wiErr
}
// MVCCResolveWriteIntent either commits or aborts (rolls back) an
// extant write intent for a given txn according to commit parameter.
// ResolveWriteIntent will skip write intents of other txns.
//
// Transaction epochs deserve a bit of explanation. The epoch for a
// transaction is incremented on transaction retries. A transaction
// retry is different from an abort. Retries can occur in SSI
// transactions when the commit timestamp is not equal to the proposed
// transaction timestamp. On a retry, the epoch is incremented instead
// of creating an entirely new transaction. This allows the intents
// that were written on previous runs to serve as locks which prevent
// concurrent reads from further incrementing the timestamp cache,
// making further transaction retries less likely.
//
// Because successive retries of a transaction may end up writing to
// different keys, the epochs serve to classify which intents get
// committed in the event the transaction succeeds (all those with
// epoch matching the commit epoch), and which intents get aborted,
// even if the transaction succeeds.
//
// TODO(tschottdorf): encountered a bug in which a Txn committed with
// its original timestamp after laying down intents at higher timestamps.
// Doesn't look like this code here caught that. Shouldn't resolve intents
// when they're not at the timestamp the Txn mandates them to be.
func MVCCResolveWriteIntent(
ctx context.Context, engine ReadWriter, ms *enginepb.MVCCStats, intent roachpb.Intent,
) error {
buf := newPutBuffer()
iter := engine.NewIterator(true)
err := mvccResolveWriteIntent(ctx, engine, iter, ms, intent, buf)
// Using defer would be more convenient, but it is measurably slower.
buf.release()
iter.Close()
return err
}
// MVCCResolveWriteIntentUsingIter is a variant of MVCCResolveWriteIntent that
// uses iterator and buffer passed as parameters (e.g. when used in a loop).
func MVCCResolveWriteIntentUsingIter(
ctx context.Context,
engine ReadWriter,
iterAndBuf IterAndBuf,
ms *enginepb.MVCCStats,
intent roachpb.Intent,
) error {
return mvccResolveWriteIntent(ctx, engine, iterAndBuf.iter, ms,
intent, iterAndBuf.buf)
}
func mvccResolveWriteIntent(
ctx context.Context,
engine ReadWriter,
iter Iterator,
ms *enginepb.MVCCStats,
intent roachpb.Intent,
buf *putBuffer,
) error {
if len(intent.Key) == 0 {
return emptyKeyError()
}
if len(intent.EndKey) > 0 {
return errors.Errorf("can't resolve range intent as point intent")
}
metaKey := MakeMVCCMetadataKey(intent.Key)
meta := &buf.meta
ok, origMetaKeySize, origMetaValSize, err := mvccGetMetadata(iter, metaKey, meta)
if err != nil {
return err
}
// For cases where there's no write intent to resolve, or one exists
// which we can't resolve, this is a noop.
if !ok || meta.Txn == nil || !roachpb.TxnIDEqual(intent.Txn.ID, meta.Txn.ID) {
return nil
}
// A commit in an older epoch or timestamp is prevented by the
// sequence cache under normal operation. Replays of EndTransaction
// commands which occur after the transaction record has been erased
// make this a possibility; we treat such intents as uncommitted.
//
// A commit with a newer epoch effectively means that we wrote this
// intent before an earlier retry, but didn't write it again
// after. A commit with an older timestamp than the intent should
// not happen even on replays because BeginTransaction has replay
// protection. The BeginTransaction replay protection guarantees a
// restart in EndTransaction, so the replay won't resolve intents.
epochsMatch := meta.Txn.Epoch == intent.Txn.Epoch
timestampsValid := !intent.Txn.Timestamp.Less(meta.Timestamp)
commit := intent.Status == roachpb.COMMITTED && epochsMatch && timestampsValid
// Note the small difference to commit epoch handling here: We allow a push
// from a previous epoch to move a newer intent. That's not necessary, but
// useful. Consider the following, where B reads at a timestamp that's
// higher than any write by A in the following diagram:
//
// | client A@epo | B (pusher) |
// =============================
// | write@1 | |
// | | read |
// | | push |
// | restart | |
// | write@2 | |
// | | resolve@1 |
// ============================
// In this case, if we required the epochs to match, we would not push the
// intent forward, and client B would upon retrying after its successful
// push and apparent resolution run into the new version of an intent again
// (which is at a higher timestamp due to the restart, but not out of the
// way of A). It would then actually succeed on the second iteration (since
// the new Epoch propagates to the Push and via that, to the Pushee txn
// used for resolving), but that costs latency.
// TODO(tschottdorf): various epoch-related scenarios here deserve more
// testing.
pushed := intent.Status == roachpb.PENDING &&
meta.Timestamp.Less(intent.Txn.Timestamp) &&
meta.Txn.Epoch >= intent.Txn.Epoch
// If we're committing, or if the commit timestamp of the intent has
// been moved forward, and if the proposed epoch matches the existing
// epoch: update the meta.Txn. For commit, it's set to nil;
// otherwise, we update its value. We may have to update the actual
// version value (remove old and create new with proper
// timestamp-encoded key) if timestamp changed.
if commit || pushed {
buf.newMeta = *meta
// Set the timestamp for upcoming write (or at least the stats update).
buf.newMeta.Timestamp = intent.Txn.Timestamp
var metaKeySize, metaValSize int64
var err error
if pushed {
// Keep intent if we're pushing timestamp.
buf.newTxn = intent.Txn
buf.newMeta.Txn = &buf.newTxn
metaKeySize, metaValSize, err = buf.putMeta(engine, metaKey, &buf.newMeta)
} else {
metaKeySize = int64(metaKey.EncodedSize())
err = engine.Clear(metaKey)
}
if err != nil {
return err
}
// Update stat counters related to resolving the intent.
if ms != nil {
ms.Add(updateStatsOnResolve(intent.Key, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize, *meta, buf.newMeta, commit))
}
// If timestamp of value changed, need to rewrite versioned value.
if meta.Timestamp != intent.Txn.Timestamp {
origKey := MVCCKey{Key: intent.Key, Timestamp: meta.Timestamp}
newKey := MVCCKey{Key: intent.Key, Timestamp: intent.Txn.Timestamp}
valBytes, err := engine.Get(origKey)
if err != nil {
return err
}
if err = engine.Clear(origKey); err != nil {
return err
}
if err = engine.Put(newKey, valBytes); err != nil {
return err
}
}
return nil
}
// This method shouldn't be called in this instance, but there's
// nothing to do if meta's epoch is greater than or equal txn's
// epoch and the state is still PENDING.
if intent.Status == roachpb.PENDING && meta.Txn.Epoch >= intent.Txn.Epoch {
return nil
}
// Otherwise, we're deleting the intent. We must find the next
// versioned value and reset the metadata's latest timestamp. If
// there are no other versioned values, we delete the metadata
// key.
//
// Note that the somewhat unintuitive case of an ABORT with
// intent.Txn.Epoch < meta.Txn.Epoch is possible:
// - writer1 writes key0 at epoch 0
// - writer2 with higher priority encounters intent at key0 (epoch 0)
// - writer1 restarts, now at epoch one (txn record not updated)
// - writer1 writes key0 at epoch 1
// - writer2 dispatches ResolveIntent to key0 (with epoch 0)
// - ResolveIntent with epoch 0 aborts intent from epoch 1.
// First clear the intent value.
latestKey := MVCCKey{Key: intent.Key, Timestamp: meta.Timestamp}
if err := engine.Clear(latestKey); err != nil {
return err
}
// Compute the next possible mvcc value for this key.
nextKey := latestKey.Next()
iter.Seek(nextKey)
// If there is no other version, we should just clean up the key entirely.
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok || !iter.UnsafeKey().Key.Equal(intent.Key) {
if err = engine.Clear(metaKey); err != nil {
return err
}
// Clear stat counters attributable to the intent we're aborting.
if ms != nil {
ms.Add(updateStatsOnAbort(intent.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0, intent.Txn.Timestamp.WallTime))
}
return nil
}
unsafeIterKey := iter.UnsafeKey()
if !unsafeIterKey.IsValue() {
return errors.Errorf("expected an MVCC value key: %s", unsafeIterKey)
}
// Get the bytes for the next version so we have size for stat counts.
valueSize := int64(len(iter.UnsafeValue()))
// Update the keyMetadata with the next version.
buf.newMeta = enginepb.MVCCMetadata{
Deleted: valueSize == 0,
KeyBytes: mvccVersionTimestampSize,
ValBytes: valueSize,
}
if err := engine.Clear(metaKey); err != nil {
return err
}
metaKeySize := int64(metaKey.EncodedSize())
metaValSize := int64(0)
// Update stat counters with older version.
if ms != nil {
ms.Add(updateStatsOnAbort(intent.Key, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize, meta, &buf.newMeta, unsafeIterKey.Timestamp.WallTime,
intent.Txn.Timestamp.WallTime))
}
return nil
}
// IterAndBuf used to pass iterators and buffers between MVCC* calls, allowing
// reuse without the callers needing to know the particulars.
type IterAndBuf struct {
buf *putBuffer
iter Iterator
}
// GetIterAndBuf returns a IterAndBuf for passing into various MVCC* methods.
func GetIterAndBuf(engine Reader) IterAndBuf {
return IterAndBuf{
buf: newPutBuffer(),
iter: engine.NewIterator(false),
}
}
// Cleanup must be called to release the resources when done.
func (b IterAndBuf) Cleanup() {
b.buf.release()
b.iter.Close()
}
// MVCCResolveWriteIntentRange commits or aborts (rolls back) the
// range of write intents specified by start and end keys for a given
// txn. ResolveWriteIntentRange will skip write intents of other
// txns.
func MVCCResolveWriteIntentRange(
ctx context.Context, engine ReadWriter, ms *enginepb.MVCCStats, intent roachpb.Intent, max int64,
) (int64, error) {
iterAndBuf := GetIterAndBuf(engine)
defer iterAndBuf.Cleanup()
return MVCCResolveWriteIntentRangeUsingIter(ctx, engine, iterAndBuf, ms, intent, max)
}
// MVCCResolveWriteIntentRangeUsingIter commits or aborts (rolls back) the
// range of write intents specified by start and end keys for a given
// txn. ResolveWriteIntentRange will skip write intents of other
// txns.
func MVCCResolveWriteIntentRangeUsingIter(
ctx context.Context,
engine ReadWriter,
iterAndBuf IterAndBuf,
ms *enginepb.MVCCStats,
intent roachpb.Intent,
max int64,
) (int64, error) {
encKey := MakeMVCCMetadataKey(intent.Key)
encEndKey := MakeMVCCMetadataKey(intent.EndKey)
nextKey := encKey
var keyBuf []byte
num := int64(0)
intent.EndKey = nil
for num < max {
iterAndBuf.iter.Seek(nextKey)
if ok, err := iterAndBuf.iter.Valid(); err != nil {
return 0, err
} else if !ok || !iterAndBuf.iter.UnsafeKey().Less(encEndKey) {
// No more keys exists in the given range.
break
}
// Manually copy the underlying bytes of the unsafe key. This construction
// reuses keyBuf across iterations.
key := iterAndBuf.iter.UnsafeKey()
keyBuf = append(keyBuf[:0], key.Key...)
key.Key = keyBuf
var err error
if !key.IsValue() {
intent.Key = key.Key
err = mvccResolveWriteIntent(ctx, engine, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
}
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %v", key.Key, err)
} else {
num++
}
// nextKey is already a metadata key.
nextKey.Key = key.Key.Next()
}
return num, nil
}
// MVCCGarbageCollect creates an iterator on the engine. In parallel
// it iterates through the keys listed for garbage collection by the
// keys slice. The engine iterator is seeked in turn to each listed
// key, clearing all values with timestamps <= to expiration. The
// timestamp parameter is used to compute the intent age on GC.
// Garbage collection stops after clearing maxClears values
// (to limit the size of the WriteBatch produced).
func MVCCGarbageCollect(
ctx context.Context,
engine ReadWriter,
ms *enginepb.MVCCStats,
keys []roachpb.GCRequest_GCKey,
timestamp hlc.Timestamp,
maxClears int64,
) error {
iter := engine.NewIterator(false)
defer iter.Close()
// Iterate through specified GC keys.
var count int64
meta := &enginepb.MVCCMetadata{}
for _, gcKey := range keys {
encKey := MakeMVCCMetadataKey(gcKey.Key)
ok, metaKeySize, metaValSize, err := mvccGetMetadata(iter, encKey, meta)
if err != nil {
return err
}
if !ok {
continue
}
inlinedValue := meta.IsInline()
implicitMeta := iter.UnsafeKey().IsValue()
// First, check whether all values of the key are being deleted.
if !gcKey.Timestamp.Less(meta.Timestamp) {
// For version keys, don't allow GC'ing the meta key if it's
// not marked deleted. However, for inline values we allow it;
// they are internal and GCing them directly saves the extra
// deletion step.
if !meta.Deleted && !inlinedValue {
return errors.Errorf("request to GC non-deleted, latest value of %q", gcKey.Key)
}
if meta.Txn != nil {
return errors.Errorf("request to GC intent at %q", gcKey.Key)
}
if ms != nil {
if inlinedValue {
updateStatsForInline(ms, gcKey.Key, metaKeySize, metaValSize, 0, 0)
ms.AgeTo(timestamp.WallTime)
} else {
ms.Add(updateStatsOnGC(gcKey.Key, metaKeySize, metaValSize,
meta, meta.Timestamp.WallTime, timestamp.WallTime))
}
}
if !implicitMeta {
if err := engine.Clear(iter.UnsafeKey()); err != nil {
return err
}
count++
if count >= maxClears {
return nil
}
}
}
if !implicitMeta {
// The iter is pointing at an MVCCMetadata, advance to the next entry.
iter.Next()
}
// Now, iterate through all values, GC'ing ones which have expired.
for ; ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
break
}
unsafeIterKey := iter.UnsafeKey()
if !unsafeIterKey.Key.Equal(encKey.Key) {
break
}
if !unsafeIterKey.IsValue() {
break
}
if !gcKey.Timestamp.Less(unsafeIterKey.Timestamp) {
if ms != nil {
ms.Add(updateStatsOnGC(gcKey.Key, mvccVersionTimestampSize,
int64(len(iter.UnsafeValue())), nil, unsafeIterKey.Timestamp.WallTime,
timestamp.WallTime))
}
if err := engine.Clear(unsafeIterKey); err != nil {
return err
}
count++
if count >= maxClears {
return nil
}
}
}
}
return nil
}
// IsValidSplitKey returns whether the key is a valid split key. Certain key
// ranges cannot be split (the meta1 span and the system DB span); split keys
// chosen within any of these ranges are considered invalid. And a split key
// equal to Meta2KeyMax (\x03\xff\xff) is considered invalid.
func IsValidSplitKey(key roachpb.Key) bool {
// TODO(peter): What is this restriction about? Document.
if keys.Meta2KeyMax.Equal(key) {
return false
}
for _, span := range keys.NoSplitSpans {
if bytes.Compare(key, span.Key) >= 0 && bytes.Compare(key, span.EndKey) < 0 {
return false
}
}
return true
}
// MVCCFindSplitKey suggests a split key from the given user-space key
// range that aims to roughly cut into half the total number of bytes
// used (in raw key and value byte strings) in both subranges. Specify
// a snapshot engine to safely invoke this method in a goroutine.
//
// The split key will never be chosen from the key ranges listed in
// illegalSplitKeySpans.
//
// debugFn, if not nil, is used to print informational log messages about
// the key finding process.
func MVCCFindSplitKey(
ctx context.Context,
engine Reader,
rangeID roachpb.RangeID,
key,
endKey roachpb.RKey,
targetSize int64,
) (roachpb.Key, error) {
if key.Less(roachpb.RKey(keys.LocalMax)) {
key = roachpb.RKey(keys.LocalMax)
}
encStartKey := MakeMVCCMetadataKey(key.AsRawKey())
encEndKey := MakeMVCCMetadataKey(endKey.AsRawKey())
if log.V(2) {
log.Infof(ctx, "searching split key for %d [%s, %s)", rangeID, key, endKey)
}
// Get range size from stats.
ms, err := MVCCGetRangeStats(ctx, engine, rangeID)
if err != nil {
return nil, err
}
rangeSize := ms.KeyBytes + ms.ValBytes
if log.V(2) {
log.Infof(ctx, "range size: %s, targetSize %s",
humanize.IBytes(uint64(rangeSize)), humanize.IBytes(uint64(targetSize)))
}
sizeSoFar := int64(0)
bestSplitKey := encStartKey
bestSplitDiff := int64(math.MaxInt64)
var lastKey roachpb.Key
var n int
if err := engine.Iterate(encStartKey, encEndKey, func(kv MVCCKeyValue) (bool, error) {
n++
// Is key within a legal key range? Note that we never choose the first key
// as the split key.
valid := n > 1 && IsValidSplitKey(kv.Key.Key)
// Determine if this key would make a better split than last "best" key.
diff := targetSize - sizeSoFar
if diff < 0 {
diff = -diff
}
if valid && diff < bestSplitDiff {
if log.V(2) {
log.Infof(ctx, "better split: diff %d at %s", diff, kv.Key)
}
bestSplitKey = kv.Key
bestSplitDiff = diff
}
// Determine whether we've found best key and can exit iteration.
done := !bestSplitKey.Key.Equal(encStartKey.Key) && diff > bestSplitDiff
if done && log.V(2) {
log.Infof(ctx, "target size reached")
}
// Add this key/value to the size scanned so far.
if kv.Key.IsValue() && bytes.Equal(kv.Key.Key, lastKey) {
sizeSoFar += mvccVersionTimestampSize + int64(len(kv.Value))
} else {
sizeSoFar += int64(kv.Key.EncodedSize() + len(kv.Value))
}
lastKey = kv.Key.Key
return done, nil
}); err != nil {
return nil, err
}
if bestSplitKey.Key.Equal(encStartKey.Key) {
return nil, nil
}
// The key is an MVCC (versioned) key, so to avoid corrupting MVCC we only
// return the base portion, which is fine to split in front of.
return bestSplitKey.Key, nil
}
// willOverflow returns true iff adding both inputs would under- or overflow
// the 64 bit integer range.
func willOverflow(a, b int64) bool {
// Morally MinInt64 < a+b < MaxInt64, but without overflows.
// First make sure that a <= b. If not, swap them.
if a > b {
a, b = b, a
}
// Now b is the larger of the numbers, and we compare sizes
// in a way that can never over- or underflow.
if b > 0 {
return a > math.MaxInt64-b
}
return math.MinInt64-b > a
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.0.6

搜索帮助