90 Star 491 Fork 149

平凯星辰(北京)科技有限公司/tidb

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
executor.go 30.81 KB
Copy Edit Raw Blame History
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
var (
_ Executor = &baseExecutor{}
_ Executor = &CheckTableExec{}
_ Executor = &HashAggExec{}
_ Executor = &LimitExec{}
_ Executor = &MaxOneRowExec{}
_ Executor = &ProjectionExec{}
_ Executor = &SelectionExec{}
_ Executor = &SelectLockExec{}
_ Executor = &ShowDDLExec{}
_ Executor = &ShowDDLJobsExec{}
_ Executor = &ShowDDLJobQueriesExec{}
_ Executor = &SortExec{}
_ Executor = &StreamAggExec{}
_ Executor = &TableDualExec{}
_ Executor = &TableScanExec{}
_ Executor = &TopNExec{}
_ Executor = &UnionExec{}
_ Executor = &CheckIndexExec{}
_ Executor = &HashJoinExec{}
_ Executor = &IndexLookUpExecutor{}
_ Executor = &MergeJoinExec{}
)
type baseExecutor struct {
ctx sessionctx.Context
id string
schema *expression.Schema
initCap int
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStats *execdetails.RuntimeStats
}
// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
err := child.Open(ctx)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
// Close closes all executors and release all resources.
func (e *baseExecutor) Close() error {
for _, child := range e.children {
err := child.Close()
if err != nil {
return errors.Trace(err)
}
}
return nil
}
// Schema returns the current baseExecutor's schema. If it is nil, then create and return a new one.
func (e *baseExecutor) Schema() *expression.Schema {
if e.schema == nil {
return expression.NewSchema()
}
return e.schema
}
// newFirstChunk creates a new chunk to buffer current executor's result.
func (e *baseExecutor) newFirstChunk() *chunk.Chunk {
return chunk.New(e.retTypes(), e.initCap, e.maxChunkSize)
}
// retTypes returns all output column types.
func (e *baseExecutor) retTypes() []*types.FieldType {
return e.retFieldTypes
}
// Next fills mutiple rows into a chunk.
func (e *baseExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id string, children ...Executor) baseExecutor {
e := baseExecutor{
children: children,
ctx: ctx,
id: id,
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.Get(e.id)
}
if schema != nil {
cols := schema.Columns
e.retFieldTypes = make([]*types.FieldType, len(cols))
for i := range cols {
e.retFieldTypes[i] = cols[i].RetType
}
}
return e
}
// Executor is the physical implementation of a algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a "Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
Open(context.Context) error
Next(ctx context.Context, chk *chunk.Chunk) error
Close() error
Schema() *expression.Schema
retTypes() []*types.FieldType
newFirstChunk() *chunk.Chunk
}
// CancelDDLJobsExec represents a cancel DDL jobs executor.
type CancelDDLJobsExec struct {
baseExecutor
cursor int
jobIDs []int64
errs []error
}
// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
return nil
}
numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobIDs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
chk.AppendString(0, fmt.Sprintf("%d", e.jobIDs[i]))
if e.errs[i] != nil {
chk.AppendString(1, fmt.Sprintf("error: %v", e.errs[i]))
} else {
chk.AppendString(1, "successful")
}
}
e.cursor += numCurBatch
return nil
}
// ShowDDLExec represents a show DDL executor.
type ShowDDLExec struct {
baseExecutor
ddlOwnerID string
selfID string
ddlInfo *admin.DDLInfo
done bool
}
// Next implements the Executor Next interface.
func (e *ShowDDLExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
ddlJobs := ""
l := len(e.ddlInfo.Jobs)
for i, job := range e.ddlInfo.Jobs {
ddlJobs += job.String()
if i != l-1 {
ddlJobs += "\n"
}
}
chk.AppendInt64(0, e.ddlInfo.SchemaVer)
chk.AppendString(1, e.ddlOwnerID)
chk.AppendString(2, ddlJobs)
chk.AppendString(3, e.selfID)
e.done = true
return nil
}
// ShowDDLJobsExec represent a show DDL jobs executor.
type ShowDDLJobsExec struct {
baseExecutor
cursor int
jobs []*model.Job
jobNumber int64
is infoschema.InfoSchema
}
// ShowDDLJobQueriesExec represents a show DDL job queries executor.
// The jobs id that is given by 'admin show ddl job queries' statement,
// only be searched in the latest 10 history jobs
type ShowDDLJobQueriesExec struct {
baseExecutor
cursor int
jobs []*model.Job
jobIDs []int64
}
// Open implements the Executor Open interface.
func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
jobs, err := admin.GetDDLJobs(e.ctx.Txn())
if err != nil {
return errors.Trace(err)
}
historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(), admin.DefNumHistoryJobs)
if err != nil {
return errors.Trace(err)
}
e.jobs = append(e.jobs, jobs...)
e.jobs = append(e.jobs, historyJobs...)
return nil
}
// Next implements the Executor Next interface.
func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobs) {
return nil
}
if len(e.jobIDs) >= len(e.jobs) {
return nil
}
numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor)
for _, id := range e.jobIDs {
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
if id == e.jobs[i].ID {
chk.AppendString(0, e.jobs[i].Query)
}
}
}
e.cursor += numCurBatch
return nil
}
// Open implements the Executor Open interface.
func (e *ShowDDLJobsExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
jobs, err := admin.GetDDLJobs(e.ctx.Txn())
if err != nil {
return errors.Trace(err)
}
if e.jobNumber == 0 {
e.jobNumber = admin.DefNumHistoryJobs
}
historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(), int(e.jobNumber))
if err != nil {
return errors.Trace(err)
}
e.jobs = append(e.jobs, jobs...)
e.jobs = append(e.jobs, historyJobs...)
e.cursor = 0
return nil
}
// Next implements the Executor Next interface.
func (e *ShowDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobs) {
return nil
}
numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
chk.AppendInt64(0, e.jobs[i].ID)
chk.AppendString(1, getSchemaName(e.is, e.jobs[i].SchemaID))
chk.AppendString(2, getTableName(e.is, e.jobs[i].TableID))
chk.AppendString(3, e.jobs[i].Type.String())
chk.AppendString(4, e.jobs[i].SchemaState.String())
chk.AppendInt64(5, e.jobs[i].SchemaID)
chk.AppendInt64(6, e.jobs[i].TableID)
chk.AppendInt64(7, e.jobs[i].RowCount)
chk.AppendString(8, model.TSConvert2Time(e.jobs[i].StartTS).String())
chk.AppendString(9, e.jobs[i].State.String())
}
e.cursor += numCurBatch
return nil
}
func getSchemaName(is infoschema.InfoSchema, id int64) string {
var schemaName string
DBInfo, ok := is.SchemaByID(id)
if ok {
schemaName = DBInfo.Name.O
return schemaName
}
return schemaName
}
func getTableName(is infoschema.InfoSchema, id int64) string {
var tableName string
table, ok := is.TableByID(id)
if ok {
tableName = table.Meta().Name.O
return tableName
}
return tableName
}
// CheckTableExec represents a check table executor.
// It is built from the "admin check table" statement, and it checks if the
// index matches the records in the table.
type CheckTableExec struct {
baseExecutor
tables []*ast.TableName
done bool
is infoschema.InfoSchema
genExprs map[string]expression.Expression
}
// Open implements the Executor Open interface.
func (e *CheckTableExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.done = false
return nil
}
// Next implements the Executor Next interface.
func (e *CheckTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.done {
return nil
}
defer func() { e.done = true }()
for _, t := range e.tables {
dbName := t.DBInfo.Name
tb, err := e.is.TableByName(dbName, t.Name)
if err != nil {
return errors.Trace(err)
}
if tb.Meta().GetPartitionInfo() != nil {
err = e.doCheckPartitionedTable(tb.(table.PartitionedTable))
} else {
err = e.doCheckTable(tb)
}
if err != nil {
log.Warnf("%v error:%v", t.Name, errors.ErrorStack(err))
if admin.ErrDataInConsistent.Equal(err) {
return ErrAdminCheckTable.GenWithStack("%v err:%v", t.Name, err)
}
return errors.Errorf("%v err:%v", t.Name, err)
}
}
return nil
}
func (e *CheckTableExec) doCheckPartitionedTable(tbl table.PartitionedTable) error {
info := tbl.Meta().GetPartitionInfo()
for _, def := range info.Definitions {
pid := def.ID
partition := tbl.GetPartition(pid)
if err := e.doCheckTable(partition); err != nil {
return errors.Trace(err)
}
}
return nil
}
func (e *CheckTableExec) doCheckTable(tbl table.Table) error {
for _, idx := range tbl.Indices() {
txn := e.ctx.Txn()
err := admin.CompareIndexData(e.ctx, txn, tbl, idx, e.genExprs)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
// CheckIndexExec represents the executor of checking an index.
// It is built from the "admin check index" statement, and it checks
// the consistency of the index data with the records of the table.
type CheckIndexExec struct {
baseExecutor
dbName string
tableName string
idxName string
src *IndexLookUpExecutor
done bool
is infoschema.InfoSchema
}
// Open implements the Executor Open interface.
func (e *CheckIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
if err := e.src.Open(ctx); err != nil {
return errors.Trace(err)
}
e.done = false
return nil
}
// Close implements the Executor Close interface.
func (e *CheckIndexExec) Close() error {
return errors.Trace(e.src.Close())
}
// Next implements the Executor Next interface.
func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.done {
return nil
}
defer func() { e.done = true }()
err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName})
if err != nil {
return errors.Trace(err)
}
chk = e.src.newFirstChunk()
for {
err := e.src.Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}
}
return nil
}
// ShowSlowExec represents the executor of showing the slow queries.
// It is build from the "admin show slow" statement:
// admin show slow top [internal | all] N
// admin show slow recent N
type ShowSlowExec struct {
baseExecutor
ShowSlow *ast.ShowSlow
result []*domain.SlowQueryInfo
cursor int
}
// Open implements the Executor Open interface.
func (e *ShowSlowExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
dom := domain.GetDomain(e.ctx)
e.result = dom.ShowSlowQuery(e.ShowSlow)
return nil
}
// Next implements the Executor Next interface.
func (e *ShowSlowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.cursor >= len(e.result) {
return nil
}
for e.cursor < len(e.result) && chk.NumRows() < e.maxChunkSize {
slow := e.result[e.cursor]
chk.AppendString(0, slow.SQL)
chk.AppendTime(1, types.Time{
Time: types.FromGoTime(slow.Start),
Type: mysql.TypeTimestamp,
Fsp: types.MaxFsp,
})
chk.AppendDuration(2, types.Duration{Duration: slow.Duration, Fsp: types.MaxFsp})
chk.AppendString(3, slow.Detail.String())
if slow.Succ {
chk.AppendInt64(4, 1)
} else {
chk.AppendInt64(4, 0)
}
chk.AppendUint64(5, slow.ConnID)
chk.AppendUint64(6, slow.TxnTS)
chk.AppendString(7, slow.User)
chk.AppendString(8, slow.DB)
chk.AppendString(9, slow.TableIDs)
chk.AppendString(10, slow.IndexIDs)
if slow.Internal {
chk.AppendInt64(11, 0)
} else {
chk.AppendInt64(11, 1)
}
e.cursor++
}
return nil
}
// SelectLockExec represents a select lock executor.
// It is built from the "SELECT .. FOR UPDATE" or the "SELECT .. LOCK IN SHARE MODE" statement.
// For "SELECT .. FOR UPDATE" statement, it locks every row key from source Executor.
// After the execution, the keys are buffered in transaction, and will be sent to KV
// when doing commit. If there is any key already locked by another transaction,
// the transaction will rollback and retry.
type SelectLockExec struct {
baseExecutor
Lock ast.SelectLockType
}
// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.ForUpdate = true
for id := range e.Schema().TblID2Handle {
// This operation is only for schema validator check.
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}
return nil
}
// Next implements the Executor Next interface.
func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
// If there's no handle or it's not a `SELECT FOR UPDATE` statement.
if len(e.Schema().TblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
return nil
}
txn := e.ctx.Txn()
keys := make([]kv.Key, 0, chk.NumRows())
iter := chunk.NewIterator4Chunk(chk)
for id, cols := range e.Schema().TblID2Handle {
for _, col := range cols {
keys = keys[:0]
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
keys = append(keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
}
err = txn.LockKeys(keys...)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}
// LimitExec represents limit executor
// It ignores 'Offset' rows from src, then returns 'Count' rows at maximum.
type LimitExec struct {
baseExecutor
begin uint64
end uint64
cursor uint64
// meetFirstBatch represents whether we have met the first valid Chunk from child.
meetFirstBatch bool
childResult *chunk.Chunk
}
// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
return nil
}
for !e.meetFirstBatch {
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
batchSize := uint64(e.childResult.NumRows())
// no more data.
if batchSize == 0 {
return nil
}
if newCursor := e.cursor + batchSize; newCursor >= e.begin {
e.meetFirstBatch = true
begin, end := e.begin-e.cursor, batchSize
if newCursor > e.end {
end = e.end - e.cursor
}
e.cursor += end
if begin == end {
break
}
chk.Append(e.childResult, int(begin), int(end))
return nil
}
e.cursor += batchSize
}
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
batchSize := uint64(chk.NumRows())
// no more data.
if batchSize == 0 {
return nil
}
if e.cursor+batchSize > e.end {
chk.TruncateTo(int(e.end - e.cursor))
batchSize = e.end - e.cursor
}
e.cursor += batchSize
return nil
}
// Open implements the Executor Open interface.
func (e *LimitExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newFirstChunk()
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
}
// Close implements the Executor Close interface.
func (e *LimitExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}
func init() {
// While doing optimization in the plan package, we need to execute uncorrelated subquery,
// but the plan package cannot import the executor package because of the dependency cycle.
// So we assign a function implemented in the executor package to the plan package to avoid the dependency cycle.
plannercore.EvalSubquery = func(p plannercore.PhysicalPlan, is infoschema.InfoSchema, sctx sessionctx.Context) (rows [][]types.Datum, err error) {
err = sctx.ActivePendingTxn()
if err != nil {
return rows, errors.Trace(err)
}
e := &executorBuilder{is: is, ctx: sctx}
exec := e.build(p)
if e.err != nil {
return rows, errors.Trace(err)
}
ctx := context.TODO()
err = exec.Open(ctx)
defer terror.Call(exec.Close)
if err != nil {
return rows, errors.Trace(err)
}
chk := exec.newFirstChunk()
for {
err = exec.Next(ctx, chk)
if err != nil {
return rows, errors.Trace(err)
}
if chk.NumRows() == 0 {
return rows, nil
}
iter := chunk.NewIterator4Chunk(chk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
row := r.GetDatumRow(exec.retTypes())
rows = append(rows, row)
}
chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize)
}
}
}
// TableDualExec represents a dual table executor.
type TableDualExec struct {
baseExecutor
// numDualRows can only be 0 or 1.
numDualRows int
numReturned int
}
// Open implements the Executor Open interface.
func (e *TableDualExec) Open(ctx context.Context) error {
e.numReturned = 0
return nil
}
// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
return nil
}
if e.Schema().Len() == 0 {
chk.SetNumVirtualRows(1)
} else {
for i := range e.Schema().Columns {
chk.AppendNull(i)
}
}
e.numReturned = e.numDualRows
return nil
}
// SelectionExec represents a filter executor.
type SelectionExec struct {
baseExecutor
batched bool
filters []expression.Expression
selected []bool
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
childResult *chunk.Chunk
}
// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newFirstChunk()
e.batched = expression.Vectorizable(e.filters)
if e.batched {
e.selected = make([]bool, 0, chunk.InitialCapacity)
}
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
e.inputRow = e.inputIter.End()
return nil
}
// Close implements plannercore.Plan Close interface.
func (e *SelectionExec) Close() error {
e.childResult = nil
e.selected = nil
return errors.Trace(e.baseExecutor.Close())
}
// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.batched {
return errors.Trace(e.unBatchedNext(ctx, chk))
}
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
if !e.selected[e.inputRow.Idx()] {
continue
}
if chk.NumRows() >= chk.Capacity() {
return nil
}
chk.AppendRow(e.inputRow)
}
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
// no more data.
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected)
if err != nil {
return errors.Trace(err)
}
e.inputRow = e.inputIter.Begin()
}
}
// unBatchedNext filters input rows one by one and returns once an input row is selected.
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
if err != nil {
return errors.Trace(err)
}
if selected {
chk.AppendRow(e.inputRow)
e.inputRow = e.inputIter.Next()
return nil
}
}
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
e.inputRow = e.inputIter.Begin()
// no more data.
if e.childResult.NumRows() == 0 {
return nil
}
}
}
// TableScanExec is a table scan executor without result fields.
type TableScanExec struct {
baseExecutor
t table.Table
seekHandle int64
iter kv.Iterator
columns []*model.ColumnInfo
isVirtualTable bool
virtualTableChunkList *chunk.List
virtualTableChunkIdx int
}
// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
return errors.Trace(e.nextChunk4InfoSchema(ctx, chk))
}
handle, found, err := e.nextHandle()
if err != nil || !found {
return errors.Trace(err)
}
mutableRow := chunk.MutRowFromTypes(e.retTypes())
for chk.NumRows() < chk.Capacity() {
row, err := e.getRow(handle)
if err != nil {
return errors.Trace(err)
}
e.seekHandle = handle + 1
mutableRow.SetDatums(row...)
chk.AppendRow(mutableRow.ToRow())
}
return nil
}
func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
if e.virtualTableChunkList == nil {
e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize)
columns := make([]*table.Column, e.schema.Len())
for i, colInfo := range e.columns {
columns[i] = table.ToColumn(colInfo)
}
mutableRow := chunk.MutRowFromTypes(e.retTypes())
err := e.t.IterRecords(e.ctx, nil, columns, func(h int64, rec []types.Datum, cols []*table.Column) (bool, error) {
mutableRow.SetDatums(rec...)
e.virtualTableChunkList.AppendRow(mutableRow.ToRow())
return true, nil
})
if err != nil {
return errors.Trace(err)
}
}
// no more data.
if e.virtualTableChunkIdx >= e.virtualTableChunkList.NumChunks() {
return nil
}
virtualTableChunk := e.virtualTableChunkList.GetChunk(e.virtualTableChunkIdx)
e.virtualTableChunkIdx++
chk.SwapColumns(virtualTableChunk)
return nil
}
// nextHandle gets the unique handle for next row.
func (e *TableScanExec) nextHandle() (handle int64, found bool, err error) {
for {
handle, found, err = e.t.Seek(e.ctx, e.seekHandle)
if err != nil || !found {
return 0, false, errors.Trace(err)
}
return handle, true, nil
}
}
func (e *TableScanExec) getRow(handle int64) ([]types.Datum, error) {
columns := make([]*table.Column, e.schema.Len())
for i, v := range e.columns {
columns[i] = table.ToColumn(v)
}
row, err := e.t.RowWithCols(e.ctx, handle, columns)
if err != nil {
return nil, errors.Trace(err)
}
return row, nil
}
// Open implements the Executor Open interface.
func (e *TableScanExec) Open(ctx context.Context) error {
e.iter = nil
e.virtualTableChunkList = nil
return nil
}
// MaxOneRowExec checks if the number of rows that a query returns is at maximum one.
// It's built from subquery expression.
type MaxOneRowExec struct {
baseExecutor
evaluated bool
}
// Open implements the Executor Open interface.
func (e *MaxOneRowExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.evaluated = false
return nil
}
// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
return nil
}
e.evaluated = true
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
if num := chk.NumRows(); num == 0 {
for i := range e.schema.Columns {
chk.AppendNull(i)
}
return nil
} else if num != 1 {
return errors.New("subquery returns more than 1 row")
}
childChunk := e.children[0].newFirstChunk()
err = e.children[0].Next(ctx, childChunk)
if childChunk.NumRows() != 0 {
return errors.New("subquery returns more than 1 row")
}
return nil
}
// UnionExec pulls all it's children's result and returns to its parent directly.
// A "resultPuller" is started for every child to pull result from that child and push it to the "resultPool", the used
// "Chunk" is obtained from the corresponding "resourcePool". All resultPullers are running concurrently.
// +----------------+
// +---> resourcePool 1 ---> | resultPuller 1 |-----+
// | +----------------+ |
// | |
// | +----------------+ v
// +---> resourcePool 2 ---> | resultPuller 2 |-----> resultPool ---+
// | +----------------+ ^ |
// | ...... | |
// | +----------------+ | |
// +---> resourcePool n ---> | resultPuller n |-----+ |
// | +----------------+ |
// | |
// | +-------------+ |
// |--------------------------| main thread | <---------------------+
// +-------------+
type UnionExec struct {
baseExecutor
stopFetchData atomic.Value
wg sync.WaitGroup
finished chan struct{}
resourcePools []chan *chunk.Chunk
resultPool chan *unionWorkerResult
initialized bool
childrenResults []*chunk.Chunk
}
// unionWorkerResult stores the result for a union worker.
// A "resultPuller" is started for every child to pull result from that child, unionWorkerResult is used to store that pulled result.
// "src" is used for Chunk reuse: after pulling result from "resultPool", main-thread must push a valid unused Chunk to "src" to
// enable the corresponding "resultPuller" continue to work.
type unionWorkerResult struct {
chk *chunk.Chunk
err error
src chan<- *chunk.Chunk
}
func (e *UnionExec) waitAllFinished() {
e.wg.Wait()
close(e.resultPool)
}
// Open implements the Executor Open interface.
func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newFirstChunk())
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
return nil
}
func (e *UnionExec) initialize(ctx context.Context) {
e.resultPool = make(chan *unionWorkerResult, len(e.children))
e.resourcePools = make([]chan *chunk.Chunk, len(e.children))
for i := range e.children {
e.resourcePools[i] = make(chan *chunk.Chunk, 1)
e.resourcePools[i] <- e.childrenResults[i]
e.wg.Add(1)
go e.resultPuller(ctx, i)
}
go e.waitAllFinished()
}
func (e *UnionExec) resultPuller(ctx context.Context, childID int) {
result := &unionWorkerResult{
err: nil,
chk: nil,
src: e.resourcePools[childID],
}
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
log.Errorf("resultPuller panic stack is:\n%s", buf)
result.err = errors.Errorf("%v", r)
e.resultPool <- result
e.stopFetchData.Store(true)
}
e.wg.Done()
}()
for {
if e.stopFetchData.Load().(bool) {
return
}
select {
case <-e.finished:
return
case result.chk = <-e.resourcePools[childID]:
}
result.err = errors.Trace(e.children[childID].Next(ctx, result.chk))
if result.err == nil && result.chk.NumRows() == 0 {
return
}
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
}
}
}
// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
e.initialize(ctx)
e.initialized = true
}
result, ok := <-e.resultPool
if !ok {
return nil
}
if result.err != nil {
return errors.Trace(result.err)
}
chk.SwapColumns(result.chk)
result.src <- result.chk
return nil
}
// Close implements the Executor Close interface.
func (e *UnionExec) Close() error {
close(e.finished)
e.childrenResults = nil
if e.resultPool != nil {
for range e.resultPool {
}
}
e.resourcePools = nil
return errors.Trace(e.baseExecutor.Close())
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pingcap/tidb.git
git@gitee.com:pingcap/tidb.git
pingcap
tidb
tidb
v2.1.0-rc.4

Search