1 Star 0 Fork 0

zhoujin826/tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
executor.go 14.65 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"bytes"
"encoding/binary"
"sort"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
tipb "github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
var (
_ executor = &tableScanExec{}
_ executor = &indexScanExec{}
_ executor = &selectionExec{}
_ executor = &limitExec{}
_ executor = &topNExec{}
)
type executor interface {
SetSrcExec(executor)
GetSrcExec() executor
ResetCounts()
Counts() []int64
Next(ctx context.Context) ([][]byte, error)
// Cursor returns the key gonna to be scanned by the Next() function.
Cursor() (key []byte, desc bool)
}
type tableScanExec struct {
*tipb.TableScan
colIDs map[int64]int
kvRanges []kv.KeyRange
startTS uint64
isolationLevel kvrpcpb.IsolationLevel
mvccStore MVCCStore
cursor int
seekKey []byte
start int
counts []int64
src executor
}
func (e *tableScanExec) SetSrcExec(exec executor) {
e.src = exec
}
func (e *tableScanExec) GetSrcExec() executor {
return e.src
}
func (e *tableScanExec) ResetCounts() {
if e.counts != nil {
e.start = e.cursor
e.counts[e.start] = 0
}
}
func (e *tableScanExec) Counts() []int64 {
if e.counts == nil {
return nil
}
if e.seekKey == nil {
return e.counts[e.start:e.cursor]
}
return e.counts[e.start : e.cursor+1]
}
func (e *tableScanExec) Cursor() ([]byte, bool) {
if len(e.seekKey) > 0 {
return e.seekKey, e.Desc
}
if e.cursor < len(e.kvRanges) {
ran := e.kvRanges[e.cursor]
if ran.IsPoint() {
return ran.StartKey, e.Desc
}
if e.Desc {
return ran.EndKey, e.Desc
}
return ran.StartKey, e.Desc
}
if e.Desc {
return e.kvRanges[len(e.kvRanges)-1].StartKey, e.Desc
}
return e.kvRanges[len(e.kvRanges)-1].EndKey, e.Desc
}
func (e *tableScanExec) Next(ctx context.Context) (value [][]byte, err error) {
for e.cursor < len(e.kvRanges) {
ran := e.kvRanges[e.cursor]
if ran.IsPoint() {
value, err = e.getRowFromPoint(ran)
if err != nil {
return nil, errors.Trace(err)
}
e.cursor++
if value == nil {
continue
}
if e.counts != nil {
e.counts[e.cursor-1]++
}
return value, nil
}
value, err = e.getRowFromRange(ran)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
e.seekKey = nil
e.cursor++
continue
}
if e.counts != nil {
e.counts[e.cursor]++
}
return value, nil
}
return nil, nil
}
func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel)
if err != nil {
return nil, errors.Trace(err)
}
if len(val) == 0 {
return nil, nil
}
handle, err := tablecodec.DecodeRowKey(ran.StartKey)
if err != nil {
return nil, errors.Trace(err)
}
row, err := getRowData(e.Columns, e.colIDs, handle, val)
if err != nil {
return nil, errors.Trace(err)
}
return row, nil
}
func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) ([][]byte, error) {
if e.seekKey == nil {
if e.Desc {
e.seekKey = ran.EndKey
} else {
e.seekKey = ran.StartKey
}
}
var pairs []Pair
var pair Pair
if e.Desc {
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel)
} else {
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel)
}
if len(pairs) > 0 {
pair = pairs[0]
}
if pair.Err != nil {
// TODO: Handle lock error.
return nil, errors.Trace(pair.Err)
}
if pair.Key == nil {
return nil, nil
}
if e.Desc {
if bytes.Compare(pair.Key, ran.StartKey) < 0 {
return nil, nil
}
e.seekKey = []byte(tablecodec.TruncateToRowKeyLen(kv.Key(pair.Key)))
} else {
if bytes.Compare(pair.Key, ran.EndKey) >= 0 {
return nil, nil
}
e.seekKey = []byte(kv.Key(pair.Key).PrefixNext())
}
handle, err := tablecodec.DecodeRowKey(pair.Key)
if err != nil {
return nil, errors.Trace(err)
}
row, err := getRowData(e.Columns, e.colIDs, handle, pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
return row, nil
}
const (
pkColNotExists = iota
pkColIsSigned
pkColIsUnsigned
)
type indexScanExec struct {
*tipb.IndexScan
colsLen int
kvRanges []kv.KeyRange
startTS uint64
isolationLevel kvrpcpb.IsolationLevel
mvccStore MVCCStore
cursor int
seekKey []byte
pkStatus int
start int
counts []int64
src executor
}
func (e *indexScanExec) SetSrcExec(exec executor) {
e.src = exec
}
func (e *indexScanExec) GetSrcExec() executor {
return e.src
}
func (e *indexScanExec) ResetCounts() {
if e.counts != nil {
e.start = e.cursor
e.counts[e.start] = 0
}
}
func (e *indexScanExec) Counts() []int64 {
if e.counts == nil {
return nil
}
if e.seekKey == nil {
return e.counts[e.start:e.cursor]
}
return e.counts[e.start : e.cursor+1]
}
func (e *indexScanExec) isUnique() bool {
return e.Unique != nil && *e.Unique
}
func (e *indexScanExec) Cursor() ([]byte, bool) {
if len(e.seekKey) > 0 {
return e.seekKey, e.Desc
}
if e.cursor < len(e.kvRanges) {
ran := e.kvRanges[e.cursor]
if ran.IsPoint() && e.isUnique() {
return ran.StartKey, e.Desc
}
if e.Desc {
return ran.EndKey, e.Desc
}
return ran.StartKey, e.Desc
}
if e.Desc {
return e.kvRanges[len(e.kvRanges)-1].StartKey, e.Desc
}
return e.kvRanges[len(e.kvRanges)-1].EndKey, e.Desc
}
func (e *indexScanExec) Next(ctx context.Context) (value [][]byte, err error) {
for e.cursor < len(e.kvRanges) {
ran := e.kvRanges[e.cursor]
if ran.IsPoint() && e.isUnique() {
value, err = e.getRowFromPoint(ran)
if err != nil {
return nil, errors.Trace(err)
}
e.cursor++
if value == nil {
continue
}
if e.counts != nil {
e.counts[e.cursor-1]++
}
} else {
value, err = e.getRowFromRange(ran)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
e.cursor++
e.seekKey = nil
continue
}
if e.counts != nil {
e.counts[e.cursor]++
}
}
return value, nil
}
return nil, nil
}
// getRowFromPoint is only used for unique key.
func (e *indexScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel)
if err != nil {
return nil, errors.Trace(err)
}
if len(val) == 0 {
return nil, nil
}
return e.decodeIndexKV(Pair{Key: ran.StartKey, Value: val})
}
func (e *indexScanExec) decodeIndexKV(pair Pair) ([][]byte, error) {
values, b, err := tablecodec.CutIndexKeyNew(pair.Key, e.colsLen)
if err != nil {
return nil, errors.Trace(err)
}
if len(b) > 0 {
if e.pkStatus != pkColNotExists {
values = append(values, b)
}
} else if e.pkStatus != pkColNotExists {
handle, err := decodeHandle(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
var handleDatum types.Datum
if e.pkStatus == pkColIsUnsigned {
handleDatum = types.NewUintDatum(uint64(handle))
} else {
handleDatum = types.NewIntDatum(handle)
}
handleBytes, err := codec.EncodeValue(nil, b, handleDatum)
if err != nil {
return nil, errors.Trace(err)
}
values = append(values, handleBytes)
}
return values, nil
}
func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) ([][]byte, error) {
if e.seekKey == nil {
if e.Desc {
e.seekKey = ran.EndKey
} else {
e.seekKey = ran.StartKey
}
}
var pairs []Pair
var pair Pair
if e.Desc {
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel)
} else {
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel)
}
if len(pairs) > 0 {
pair = pairs[0]
}
if pair.Err != nil {
// TODO: Handle lock error.
return nil, errors.Trace(pair.Err)
}
if pair.Key == nil {
return nil, nil
}
if e.Desc {
if bytes.Compare(pair.Key, ran.StartKey) < 0 {
return nil, nil
}
e.seekKey = pair.Key
} else {
if bytes.Compare(pair.Key, ran.EndKey) >= 0 {
return nil, nil
}
e.seekKey = []byte(kv.Key(pair.Key).PrefixNext())
}
return e.decodeIndexKV(pair)
}
type selectionExec struct {
conditions []expression.Expression
relatedColOffsets []int
row []types.Datum
evalCtx *evalContext
src executor
}
func (e *selectionExec) SetSrcExec(exec executor) {
e.src = exec
}
func (e *selectionExec) GetSrcExec() executor {
return e.src
}
func (e *selectionExec) ResetCounts() {
e.src.ResetCounts()
}
func (e *selectionExec) Counts() []int64 {
return e.src.Counts()
}
// evalBool evaluates expression to a boolean value.
func evalBool(exprs []expression.Expression, row types.DatumRow, ctx *stmtctx.StatementContext) (bool, error) {
for _, expr := range exprs {
data, err := expr.Eval(row)
if err != nil {
return false, errors.Trace(err)
}
if data.IsNull() {
return false, nil
}
isBool, err := data.ToBool(ctx)
if err != nil {
return false, errors.Trace(err)
}
if isBool == 0 {
return false, nil
}
}
return true, nil
}
func (e *selectionExec) Cursor() ([]byte, bool) {
return e.src.Cursor()
}
func (e *selectionExec) Next(ctx context.Context) (value [][]byte, err error) {
for {
value, err = e.src.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
return nil, nil
}
err = e.evalCtx.decodeRelatedColumnVals(e.relatedColOffsets, value, e.row)
if err != nil {
return nil, errors.Trace(err)
}
match, err := evalBool(e.conditions, e.row, e.evalCtx.sc)
if err != nil {
return nil, errors.Trace(err)
}
if match {
return value, nil
}
}
}
type topNExec struct {
heap *topNHeap
evalCtx *evalContext
relatedColOffsets []int
orderByExprs []expression.Expression
row types.DatumRow
cursor int
executed bool
src executor
}
func (e *topNExec) SetSrcExec(src executor) {
e.src = src
}
func (e *topNExec) GetSrcExec() executor {
return e.src
}
func (e *topNExec) ResetCounts() {
e.src.ResetCounts()
}
func (e *topNExec) Counts() []int64 {
return e.src.Counts()
}
func (e *topNExec) innerNext(ctx context.Context) (bool, error) {
value, err := e.src.Next(ctx)
if err != nil {
return false, errors.Trace(err)
}
if value == nil {
return false, nil
}
err = e.evalTopN(value)
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}
func (e *topNExec) Cursor() ([]byte, bool) {
panic("don't not use coprocessor streaming API for topN!")
}
func (e *topNExec) Next(ctx context.Context) (value [][]byte, err error) {
if !e.executed {
for {
hasMore, err := e.innerNext(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if !hasMore {
break
}
}
e.executed = true
}
if e.cursor >= len(e.heap.rows) {
return nil, nil
}
sort.Sort(&e.heap.topNSorter)
row := e.heap.rows[e.cursor]
e.cursor++
return row.data, nil
}
// evalTopN evaluates the top n elements from the data. The input receives a record including its handle and data.
// And this function will check if this record can replace one of the old records.
func (e *topNExec) evalTopN(value [][]byte) error {
newRow := &sortRow{
key: make([]types.Datum, len(value)),
}
err := e.evalCtx.decodeRelatedColumnVals(e.relatedColOffsets, value, e.row)
if err != nil {
return errors.Trace(err)
}
for i, expr := range e.orderByExprs {
newRow.key[i], err = expr.Eval(e.row)
if err != nil {
return errors.Trace(err)
}
}
if e.heap.tryToAddRow(newRow) {
for _, val := range value {
newRow.data = append(newRow.data, val)
}
}
return errors.Trace(e.heap.err)
}
type limitExec struct {
limit uint64
cursor uint64
src executor
}
func (e *limitExec) SetSrcExec(src executor) {
e.src = src
}
func (e *limitExec) GetSrcExec() executor {
return e.src
}
func (e *limitExec) ResetCounts() {
e.src.ResetCounts()
}
func (e *limitExec) Counts() []int64 {
return e.src.Counts()
}
func (e *limitExec) Cursor() ([]byte, bool) {
return e.src.Cursor()
}
func (e *limitExec) Next(ctx context.Context) (value [][]byte, err error) {
if e.cursor >= e.limit {
return nil, nil
}
value, err = e.src.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
return nil, nil
}
e.cursor++
return value, nil
}
func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool {
offset, ok := colIDs[id]
if ok && data[offset] != nil {
return true
}
return false
}
// getRowData decodes raw byte slice to row data.
func getRowData(columns []*tipb.ColumnInfo, colIDs map[int64]int, handle int64, value []byte) ([][]byte, error) {
values, err := tablecodec.CutRowNew(value, colIDs)
if err != nil {
return nil, errors.Trace(err)
}
if values == nil {
values = make([][]byte, len(colIDs))
}
// Fill the handle and null columns.
for _, col := range columns {
id := col.GetColumnId()
offset := colIDs[id]
if col.GetPkHandle() || id == model.ExtraHandleID {
var handleDatum types.Datum
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
// PK column is Unsigned.
handleDatum = types.NewUintDatum(uint64(handle))
} else {
handleDatum = types.NewIntDatum(handle)
}
handleData, err1 := codec.EncodeValue(nil, nil, handleDatum)
if err1 != nil {
return nil, errors.Trace(err1)
}
values[offset] = handleData
continue
}
if hasColVal(values, colIDs, id) {
continue
}
if len(col.DefaultVal) > 0 {
values[offset] = col.DefaultVal
continue
}
if mysql.HasNotNullFlag(uint(col.GetFlag())) {
return nil, errors.Errorf("Miss column %d", id)
}
values[offset] = []byte{codec.NilFlag}
}
return values, nil
}
func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, pbExprs []*tipb.Expr) ([]expression.Expression, error) {
exprs := make([]expression.Expression, 0, len(pbExprs))
for _, expr := range pbExprs {
e, err := expression.PBToExpr(expr, fieldTps, sc)
if err != nil {
return nil, errors.Trace(err)
}
exprs = append(exprs, e)
}
return exprs, nil
}
func decodeHandle(data []byte) (int64, error) {
var h int64
buf := bytes.NewBuffer(data)
err := binary.Read(buf, binary.BigEndian, &h)
return h, errors.Trace(err)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhoujin826/tidb.git
git@gitee.com:zhoujin826/tidb.git
zhoujin826
tidb
tidb
v2.0.6

搜索帮助