1 Star 0 Fork 0

zhoujin826/tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
admin.go 16.53 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package admin
import (
"fmt"
"io"
"reflect"
"time"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
)
// DDLInfo is for DDL information.
type DDLInfo struct {
SchemaVer int64
ReorgHandle int64 // it's only used for DDL information.
Job *model.Job
}
// GetDDLInfo returns DDL information.
func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
var err error
info := &DDLInfo{}
t := meta.NewMeta(txn)
info.Job, err = t.GetDDLJob(0)
if err != nil {
return nil, errors.Trace(err)
}
info.SchemaVer, err = t.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
if info.Job == nil {
return info, nil
}
info.ReorgHandle, err = t.GetDDLReorgHandle(info.Job)
if err != nil {
return nil, errors.Trace(err)
}
return info, nil
}
// CancelJobs cancels the DDL jobs.
func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if len(ids) == 0 {
return nil, nil
}
jobs, err := GetDDLJobs(txn)
if err != nil {
return nil, errors.Trace(err)
}
errs := make([]error, len(ids))
t := meta.NewMeta(txn)
for i, id := range ids {
found := false
for j, job := range jobs {
if id != job.ID {
log.Debugf("the job ID %d that needs to be canceled isn't equal to current job ID %d", id, job.ID)
continue
}
found = true
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
errs[i] = errors.New("This job is finished, so can't be cancelled")
continue
}
// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
err := job.DecodeArgs(job.RawArgs)
if err != nil {
errs[i] = errors.Trace(err)
continue
}
err = t.UpdateDDLJob(int64(j), job, true)
if err != nil {
errs[i] = errors.Trace(err)
}
}
if !found {
errs[i] = errors.New("Can't find this job")
}
}
return errs, nil
}
// GetDDLJobs returns the DDL jobs and an error.
func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) {
t := meta.NewMeta(txn)
cnt, err := t.DDLJobQueueLen()
if err != nil {
return nil, errors.Trace(err)
}
jobs := make([]*model.Job, cnt)
for i := range jobs {
jobs[i], err = t.GetDDLJob(int64(i))
if err != nil {
return nil, errors.Trace(err)
}
}
return jobs, nil
}
// MaxHistoryJobs is exported for testing.
const MaxHistoryJobs = 10
// GetHistoryDDLJobs returns the DDL history jobs and an error.
// The maximum count of history jobs is MaxHistoryJobs.
func GetHistoryDDLJobs(txn kv.Transaction) ([]*model.Job, error) {
t := meta.NewMeta(txn)
jobs, err := t.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
jobsLen := len(jobs)
if jobsLen > MaxHistoryJobs {
start := jobsLen - MaxHistoryJobs
jobs = jobs[start:]
}
jobsLen = len(jobs)
ret := make([]*model.Job, 0, jobsLen)
for i := jobsLen - 1; i >= 0; i-- {
ret = append(ret, jobs[i])
}
return ret, nil
}
func nextIndexVals(data []types.Datum) []types.Datum {
// Add 0x0 to the end of data.
return append(data, types.Datum{})
}
// RecordData is the record data composed of a handle and values.
type RecordData struct {
Handle int64
Values []types.Datum
}
func getCount(ctx sessionctx.Context, sql string) (int64, error) {
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
if err != nil {
return 0, errors.Trace(err)
}
if len(rows) != 1 {
return 0, errors.Errorf("can not get count, sql %s result rows %d", sql, len(rows))
}
return rows[0].GetInt64(0), nil
}
// CheckIndicesCount compares indices count with table count.
// It returns nil if the count from the index is equal to the count from the table columns,
// otherwise it returns an error with a different information.
func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices []string) error {
// Add `` for some names like `table name`.
sql := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName)
tblCnt, err := getCount(ctx, sql)
if err != nil {
return errors.Trace(err)
}
for _, idx := range indices {
sql = fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s` USE INDEX(`%s`)", dbName, tableName, idx)
idxCnt, err := getCount(ctx, sql)
if err != nil {
return errors.Trace(err)
}
if tblCnt != idxCnt {
return errors.Errorf("table count %d != index(%s) count %d", tblCnt, idx, idxCnt)
}
}
return nil
}
// ScanIndexData scans the index handles and values in a limited number, according to the index information.
// It returns data and the next startVals until it doesn't have data, then returns data is nil and
// the next startVals is the values which can't get data. If startVals = nil and limit = -1,
// it returns the index data of the whole.
func ScanIndexData(sc *stmtctx.StatementContext, txn kv.Transaction, kvIndex table.Index, startVals []types.Datum, limit int64) (
[]*RecordData, []types.Datum, error) {
it, _, err := kvIndex.Seek(sc, txn, startVals)
if err != nil {
return nil, nil, errors.Trace(err)
}
defer it.Close()
var idxRows []*RecordData
var curVals []types.Datum
for limit != 0 {
val, h, err1 := it.Next()
if terror.ErrorEqual(err1, io.EOF) {
return idxRows, nextIndexVals(curVals), nil
} else if err1 != nil {
return nil, nil, errors.Trace(err1)
}
idxRows = append(idxRows, &RecordData{Handle: h, Values: val})
limit--
curVals = val
}
nextVals, _, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
return idxRows, nextIndexVals(curVals), nil
} else if err != nil {
return nil, nil, errors.Trace(err)
}
return idxRows, nextVals, nil
}
// CompareIndexData compares index data one by one.
// It returns nil if the data from the index is equal to the data from the table columns,
// otherwise it returns an error with a different set of records.
func CompareIndexData(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index) error {
err := checkIndexAndRecord(sessCtx, txn, t, idx)
if err != nil {
return errors.Trace(err)
}
return CheckRecordAndIndex(sessCtx, txn, t, idx)
}
func getIndexFieldTypes(t table.Table, idx table.Index) ([]*types.FieldType, error) {
idxColumns := idx.Meta().Columns
tblColumns := t.Meta().Columns
fieldTypes := make([]*types.FieldType, 0, len(idxColumns))
for _, col := range idxColumns {
colInfo := model.FindColumnInfo(tblColumns, col.Name.L)
if colInfo == nil {
return nil, errors.Errorf("index col:%v not found in table:%v", col.Name.String(), t.Meta().Name.String())
}
fieldTypes = append(fieldTypes, &colInfo.FieldType)
}
return fieldTypes, nil
}
func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index) error {
it, err := idx.SeekFirst(txn)
if err != nil {
return errors.Trace(err)
}
defer it.Close()
cols := make([]*table.Column, len(idx.Meta().Columns))
for i, col := range idx.Meta().Columns {
cols[i] = t.Cols()[col.Offset]
}
fieldTypes, err := getIndexFieldTypes(t, idx)
if err != nil {
return errors.Trace(err)
}
for {
vals1, h, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
break
} else if err != nil {
return errors.Trace(err)
}
vals1, err = tablecodec.UnflattenDatums(vals1, fieldTypes, sessCtx.GetSessionVars().GetTimeZone())
if err != nil {
return errors.Trace(err)
}
vals2, err := rowWithCols(sessCtx, txn, t, h, cols)
if kv.ErrNotExist.Equal(err) {
record := &RecordData{Handle: h, Values: vals1}
err = errDateNotEqual.Gen("index:%v != record:%v", record, nil)
}
if err != nil {
return errors.Trace(err)
}
if !reflect.DeepEqual(vals1, vals2) {
record1 := &RecordData{Handle: h, Values: vals1}
record2 := &RecordData{Handle: h, Values: vals2}
return errDateNotEqual.Gen("index:%v != record:%v", record1, record2)
}
}
return nil
}
// CheckRecordAndIndex is exported for testing.
func CheckRecordAndIndex(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index) error {
sc := sessCtx.GetSessionVars().StmtCtx
cols := make([]*table.Column, len(idx.Meta().Columns))
for i, col := range idx.Meta().Columns {
cols[i] = t.Cols()[col.Offset]
}
startKey := t.RecordKey(0)
filterFunc := func(h1 int64, vals1 []types.Datum, cols []*table.Column) (bool, error) {
for i, val := range vals1 {
col := cols[i]
if val.IsNull() {
if mysql.HasNotNullFlag(col.Flag) {
return false, errors.New("Miss")
}
// NULL value is regarded as its default value.
colDefVal, err := table.GetColOriginDefaultValue(sessCtx, col.ToInfo())
if err != nil {
return false, errors.Trace(err)
}
vals1[i] = colDefVal
}
}
isExist, h2, err := idx.Exist(sc, txn, vals1, h1)
if kv.ErrKeyExists.Equal(err) {
record1 := &RecordData{Handle: h1, Values: vals1}
record2 := &RecordData{Handle: h2, Values: vals1}
return false, errDateNotEqual.Gen("index:%v != record:%v", record2, record1)
}
if err != nil {
return false, errors.Trace(err)
}
if !isExist {
record := &RecordData{Handle: h1, Values: vals1}
return false, errDateNotEqual.Gen("index:%v != record:%v", nil, record)
}
return true, nil
}
err := iterRecords(txn, t, startKey, cols, filterFunc)
if err != nil {
return errors.Trace(err)
}
return nil
}
func scanTableData(retriever kv.Retriever, t table.Table, cols []*table.Column, startHandle, limit int64) (
[]*RecordData, int64, error) {
var records []*RecordData
startKey := t.RecordKey(startHandle)
filterFunc := func(h int64, d []types.Datum, cols []*table.Column) (bool, error) {
if limit != 0 {
r := &RecordData{
Handle: h,
Values: d,
}
records = append(records, r)
limit--
return true, nil
}
return false, nil
}
err := iterRecords(retriever, t, startKey, cols, filterFunc)
if err != nil {
return nil, 0, errors.Trace(err)
}
if len(records) == 0 {
return records, startHandle, nil
}
nextHandle := records[len(records)-1].Handle + 1
return records, nextHandle, nil
}
// ScanTableRecord scans table row handles and column values in a limited number.
// It returns data and the next startHandle until it doesn't have data, then returns data is nil and
// the next startHandle is the handle which can't get data. If startHandle = 0 and limit = -1,
// it returns the table data of the whole.
func ScanTableRecord(retriever kv.Retriever, t table.Table, startHandle, limit int64) (
[]*RecordData, int64, error) {
return scanTableData(retriever, t, t.Cols(), startHandle, limit)
}
// ScanSnapshotTableRecord scans the ver version of the table data in a limited number.
// It returns data and the next startHandle until it doesn't have data, then returns data is nil and
// the next startHandle is the handle which can't get data. If startHandle = 0 and limit = -1,
// it returns the table data of the whole.
func ScanSnapshotTableRecord(store kv.Storage, ver kv.Version, t table.Table, startHandle, limit int64) (
[]*RecordData, int64, error) {
snap, err := store.GetSnapshot(ver)
if err != nil {
return nil, 0, errors.Trace(err)
}
records, nextHandle, err := ScanTableRecord(snap, t, startHandle, limit)
return records, nextHandle, errors.Trace(err)
}
// CompareTableRecord compares data and the corresponding table data one by one.
// It returns nil if data is equal to the data that scans from table, otherwise
// it returns an error with a different set of records. If exact is false, only compares handle.
func CompareTableRecord(txn kv.Transaction, t table.Table, data []*RecordData, exact bool) error {
m := make(map[int64][]types.Datum, len(data))
for _, r := range data {
if _, ok := m[r.Handle]; ok {
return errRepeatHandle.Gen("handle:%d is repeated in data", r.Handle)
}
m[r.Handle] = r.Values
}
startKey := t.RecordKey(0)
filterFunc := func(h int64, vals []types.Datum, cols []*table.Column) (bool, error) {
vals2, ok := m[h]
if !ok {
record := &RecordData{Handle: h, Values: vals}
return false, errDateNotEqual.Gen("data:%v != record:%v", nil, record)
}
if !exact {
delete(m, h)
return true, nil
}
if !reflect.DeepEqual(vals, vals2) {
record1 := &RecordData{Handle: h, Values: vals2}
record2 := &RecordData{Handle: h, Values: vals}
return false, errDateNotEqual.Gen("data:%v != record:%v", record1, record2)
}
delete(m, h)
return true, nil
}
err := iterRecords(txn, t, startKey, t.Cols(), filterFunc)
if err != nil {
return errors.Trace(err)
}
for h, vals := range m {
record := &RecordData{Handle: h, Values: vals}
return errDateNotEqual.Gen("data:%v != record:%v", record, nil)
}
return nil
}
func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h int64, cols []*table.Column) ([]types.Datum, error) {
key := t.RecordKey(h)
value, err := txn.Get(key)
if err != nil {
return nil, errors.Trace(err)
}
v := make([]types.Datum, len(cols))
colTps := make(map[int64]*types.FieldType, len(cols))
for i, col := range cols {
if col == nil {
continue
}
if col.State != model.StatePublic {
return nil, errInvalidColumnState.Gen("Cannot use none public column - %v", cols)
}
if col.IsPKHandleColumn(t.Meta()) {
if mysql.HasUnsignedFlag(col.Flag) {
v[i].SetUint64(uint64(h))
} else {
v[i].SetInt64(h)
}
continue
}
colTps[col.ID] = &col.FieldType
}
row, err := tablecodec.DecodeRow(value, colTps, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
for i, col := range cols {
if col == nil {
continue
}
if col.State != model.StatePublic {
// TODO: check this
return nil, errInvalidColumnState.Gen("Cannot use none public column - %v", cols)
}
if col.IsPKHandleColumn(t.Meta()) {
continue
}
ri, ok := row[col.ID]
if !ok {
if mysql.HasNotNullFlag(col.Flag) {
return nil, errors.New("Miss")
}
// NULL value is regarded as its default value.
colDefVal, err := table.GetColOriginDefaultValue(sessCtx, col.ToInfo())
if err != nil {
return nil, errors.Trace(err)
}
v[i] = colDefVal
continue
}
v[i] = ri
}
return v, nil
}
func iterRecords(retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := retriever.Seek(startKey)
if err != nil {
return errors.Trace(err)
}
defer it.Close()
if !it.Valid() {
return nil
}
log.Debugf("startKey:%q, key:%q, value:%q", startKey, it.Key(), it.Value())
colMap := make(map[int64]*types.FieldType, len(cols))
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
// TODO: check valid lock
// get row handle
handle, err := tablecodec.DecodeRowKey(it.Key())
if err != nil {
return errors.Trace(err)
}
rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, time.UTC)
if err != nil {
return errors.Trace(err)
}
data := make([]types.Datum, 0, len(cols))
for _, col := range cols {
if col.IsPKHandleColumn(t.Meta()) {
data = append(data, types.NewIntDatum(handle))
} else {
data = append(data, rowMap[col.ID])
}
}
more, err := fn(handle, data, cols)
if !more || err != nil {
return errors.Trace(err)
}
rk := t.RecordKey(handle)
err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if err != nil {
return errors.Trace(err)
}
}
return nil
}
// admin error codes.
const (
codeDataNotEqual terror.ErrCode = 1
codeRepeatHandle = 2
codeInvalidColumnState = 3
)
var (
errDateNotEqual = terror.ClassAdmin.New(codeDataNotEqual, "data isn't equal")
errRepeatHandle = terror.ClassAdmin.New(codeRepeatHandle, "handle is repeated")
errInvalidColumnState = terror.ClassAdmin.New(codeInvalidColumnState, "invalid column state")
)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhoujin826/tidb.git
git@gitee.com:zhoujin826/tidb.git
zhoujin826
tidb
tidb
v2.0.3

搜索帮助