90 Star 485 Fork 145

平凯星辰(北京)科技有限公司 / tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
join.go 20.88 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"sync"
"sync/atomic"
"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/types"
)
var (
_ joinExec = &NestedLoopJoinExec{}
_ Executor = &HashJoinExec{}
_ joinExec = &HashSemiJoinExec{}
_ Executor = &ApplyJoinExec{}
)
// HashJoinExec implements the hash join algorithm.
type HashJoinExec struct {
hashTable *mvmap.MVMap
smallHashKey []*expression.Column
bigHashKey []*expression.Column
smallExec Executor
bigExec Executor
prepared bool
ctx context.Context
smallFilter expression.CNFExprs
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
schema *expression.Schema
outer bool
leftSmall bool
cursor int
defaultValues []types.Datum
finished atomic.Value
// wg is for sync multiple join workers.
wg sync.WaitGroup
// closeCh add a lock for closing executor.
closeCh chan struct{}
rows []Row
// concurrency is number of concurrent channels.
concurrency int
bigTableResultCh []chan *execResult
hashJoinContexts []*hashJoinCtx
// Channels for output.
resultCh chan *execResult
}
// hashJoinCtx holds the variables needed to do a hash join in one of many concurrent goroutines.
type hashJoinCtx struct {
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
// datumBuffer is used for encode hash keys.
datumBuffer []types.Datum
hashKeyBuffer []byte
}
// Close implements the Executor Close interface.
func (e *HashJoinExec) Close() error {
e.finished.Store(true)
if e.prepared {
for range e.resultCh {
}
<-e.closeCh
}
e.rows = nil
return nil
}
// Open implements the Executor Open interface.
func (e *HashJoinExec) Open() error {
e.closeCh = make(chan struct{})
e.finished.Store(false)
e.bigTableResultCh = make([]chan *execResult, e.concurrency)
e.wg = sync.WaitGroup{}
for i := 0; i < e.concurrency; i++ {
e.bigTableResultCh[i] = make(chan *execResult, e.concurrency)
}
e.prepared = false
e.cursor = 0
err := e.smallExec.Open()
if err != nil {
return errors.Trace(err)
}
return errors.Trace(e.bigExec.Open())
}
// makeJoinRow simply creates a new row that appends row b to row a.
func makeJoinRow(a Row, b Row) Row {
ret := make([]types.Datum, 0, len(a)+len(b))
ret = append(ret, a...)
ret = append(ret, b...)
return ret
}
// getJoinKey gets the hash key when given a row and hash columns.
// It will return a boolean value representing if the hash key has null, a byte slice representing the result hash code.
func getJoinKey(cols []*expression.Column, row Row, vals []types.Datum, bytes []byte) (bool, []byte, error) {
var err error
for i, col := range cols {
vals[i], err = col.Eval(row)
if err != nil {
return false, nil, errors.Trace(err)
}
if vals[i].IsNull() {
return true, nil, nil
}
}
if len(vals) == 0 {
return false, nil, nil
}
bytes, err = codec.HashValues(bytes, vals...)
return false, bytes, errors.Trace(err)
}
// Schema implements the Executor Schema interface.
func (e *HashJoinExec) Schema() *expression.Schema {
return e.schema
}
var batchSize = 128
// fetchBigExec fetches rows from the big table in a background goroutine
// and sends the rows to multiple channels which will be read by multiple join workers.
func (e *HashJoinExec) fetchBigExec() {
cnt := 0
defer func() {
for _, cn := range e.bigTableResultCh {
close(cn)
}
terror.Log(errors.Trace(e.bigExec.Close()))
e.wg.Done()
}()
curBatchSize := 1
result := &execResult{rows: make([]Row, 0, curBatchSize)}
txnCtx := e.ctx.GoCtx()
for {
done := false
idx := cnt % e.concurrency
for i := 0; i < curBatchSize; i++ {
if e.finished.Load().(bool) {
return
}
row, err := e.bigExec.Next()
if err != nil {
result.err = errors.Trace(err)
e.bigTableResultCh[idx] <- result
done = true
break
}
if row == nil {
done = true
break
}
result.rows = append(result.rows, row)
if len(result.rows) >= curBatchSize {
select {
case <-txnCtx.Done():
return
case e.bigTableResultCh[idx] <- result:
result = &execResult{rows: make([]Row, 0, curBatchSize)}
}
}
}
cnt++
if done {
if len(result.rows) > 0 {
select {
case <-txnCtx.Done():
return
case e.bigTableResultCh[idx] <- result:
}
}
break
}
if curBatchSize < batchSize {
curBatchSize *= 2
}
}
}
// prepare runs the first time when 'Next' is called, it starts one worker goroutine to fetch rows from the big table,
// and reads all data from the small table to build a hash table, then starts multiple join worker goroutines.
func (e *HashJoinExec) prepare() error {
// Start a worker to fetch big table rows.
e.wg.Add(1)
go e.fetchBigExec()
e.hashTable = mvmap.NewMVMap()
e.cursor = 0
var buffer []byte
for {
row, err := e.smallExec.Next()
if err != nil {
return errors.Trace(err)
}
if row == nil {
terror.Log(errors.Trace(e.smallExec.Close()))
break
}
matched, err := expression.EvalBool(e.smallFilter, row, e.ctx)
if err != nil {
return errors.Trace(err)
}
if !matched {
continue
}
hasNull, joinKey, err := getJoinKey(e.smallHashKey, row, e.hashJoinContexts[0].datumBuffer, nil)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
buffer = buffer[:0]
buffer, err = e.encodeRow(buffer, row)
if err != nil {
return errors.Trace(err)
}
e.hashTable.Put(joinKey, buffer)
}
e.resultCh = make(chan *execResult, e.concurrency)
for i := 0; i < e.concurrency; i++ {
e.wg.Add(1)
go e.runJoinWorker(i)
}
go e.waitJoinWorkersAndCloseResultChan()
e.prepared = true
return nil
}
func (e *HashJoinExec) encodeRow(b []byte, row Row) ([]byte, error) {
loc := e.ctx.GetSessionVars().GetTimeZone()
for _, datum := range row {
tmp, err := tablecodec.EncodeValue(datum, loc)
if err != nil {
return nil, errors.Trace(err)
}
b = append(b, tmp...)
}
return b, nil
}
func (e *HashJoinExec) decodeRow(data []byte) (Row, error) {
values := make([]types.Datum, e.smallExec.Schema().Len())
err := codec.SetRawValues(data, values)
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.smallExec.Schema(), e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
return values, nil
}
func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() {
e.wg.Wait()
close(e.resultCh)
e.hashTable = nil
close(e.closeCh)
}
// runJoinWorker does join job in one goroutine.
func (e *HashJoinExec) runJoinWorker(idx int) {
maxRowsCnt := 1000
result := &execResult{rows: make([]Row, 0, maxRowsCnt)}
txnCtx := e.ctx.GoCtx()
for {
var bigTableResult *execResult
var exit bool
select {
case <-txnCtx.Done():
exit = true
case tmp, ok := <-e.bigTableResultCh[idx]:
if !ok {
exit = true
}
bigTableResult = tmp
}
if exit || e.finished.Load().(bool) {
break
}
if bigTableResult.err != nil {
e.resultCh <- &execResult{err: errors.Trace(bigTableResult.err)}
break
}
for _, bigRow := range bigTableResult.rows {
succ := e.joinOneBigRow(e.hashJoinContexts[idx], bigRow, result)
if !succ {
break
}
if len(result.rows) >= maxRowsCnt {
e.resultCh <- result
result = &execResult{rows: make([]Row, 0, maxRowsCnt)}
}
}
}
if len(result.rows) != 0 || result.err != nil {
e.resultCh <- result
}
e.wg.Done()
}
// joinOneBigRow creates result rows from a row in a big table and sends them to resultRows channel.
// Every matching row generates a result row.
// If there are no matching rows and it is outer join, a null filled result row is created.
func (e *HashJoinExec) joinOneBigRow(ctx *hashJoinCtx, bigRow Row, result *execResult) bool {
var (
matchedRows []Row
err error
)
bigMatched := true
bigMatched, err = expression.EvalBool(ctx.bigFilter, bigRow, e.ctx)
if err != nil {
result.err = errors.Trace(err)
return false
}
if bigMatched {
matchedRows, err = e.constructMatchedRows(ctx, bigRow)
if err != nil {
result.err = errors.Trace(err)
return false
}
}
result.rows = append(result.rows, matchedRows...)
if len(matchedRows) == 0 && e.outer {
r := e.fillRowWithDefaultValues(bigRow)
result.rows = append(result.rows, r)
}
return true
}
// constructMatchedRows creates matching result rows from a row in the big table.
func (e *HashJoinExec) constructMatchedRows(ctx *hashJoinCtx, bigRow Row) (matchedRows []Row, err error) {
hasNull, joinKey, err := getJoinKey(e.bigHashKey, bigRow, ctx.datumBuffer, ctx.hashKeyBuffer[0:0:cap(ctx.hashKeyBuffer)])
if err != nil {
return nil, errors.Trace(err)
}
if hasNull {
return
}
values := e.hashTable.Get(joinKey)
if len(values) == 0 {
return
}
// match eq condition
for _, value := range values {
var smallRow Row
smallRow, err = e.decodeRow(value)
if err != nil {
return nil, errors.Trace(err)
}
var matchedRow Row
if e.leftSmall {
matchedRow = makeJoinRow(smallRow, bigRow)
} else {
matchedRow = makeJoinRow(bigRow, smallRow)
}
otherMatched, err := expression.EvalBool(ctx.otherFilter, matchedRow, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
if otherMatched {
matchedRows = append(matchedRows, matchedRow)
}
}
return matchedRows, nil
}
// fillRowWithDefaultValues creates a result row filled with default values from a row in the big table.
// It is used for outer join, when a row from outer table doesn't have any matching rows.
func (e *HashJoinExec) fillRowWithDefaultValues(bigRow Row) (returnRow Row) {
smallRow := make([]types.Datum, e.smallExec.Schema().Len())
copy(smallRow, e.defaultValues)
if e.leftSmall {
returnRow = makeJoinRow(smallRow, bigRow)
} else {
returnRow = makeJoinRow(bigRow, smallRow)
}
return returnRow
}
// Next implements the Executor Next interface.
func (e *HashJoinExec) Next() (Row, error) {
if !e.prepared {
if err := e.prepare(); err != nil {
return nil, errors.Trace(err)
}
}
txnCtx := e.ctx.GoCtx()
if e.cursor >= len(e.rows) {
var result *execResult
select {
case tmp, ok := <-e.resultCh:
if !ok {
return nil, nil
}
result = tmp
if result.err != nil {
e.finished.Store(true)
return nil, errors.Trace(result.err)
}
case <-txnCtx.Done():
return nil, nil
}
if len(result.rows) == 0 {
return nil, nil
}
e.rows = result.rows
e.cursor = 0
}
row := e.rows[e.cursor]
e.cursor++
return row, nil
}
// joinExec is the common interface of join algorithm except for hash join.
type joinExec interface {
Executor
// fetchBigRow fetches a valid row from big Exec and returns a bool value that means if it is matched.
fetchBigRow() (Row, bool, error)
// prepare reads all records from small Exec and stores them.
prepare() error
// doJoin fetches a row from big exec and a bool value that means if it's matched with big filter,
// then get all the rows matches the on condition.
doJoin(Row, bool) ([]Row, error)
}
// NestedLoopJoinExec implements nested-loop algorithm for join.
type NestedLoopJoinExec struct {
innerRows []Row
cursor int
resultRows []Row
SmallExec Executor
BigExec Executor
leftSmall bool
prepared bool
Ctx context.Context
SmallFilter expression.CNFExprs
BigFilter expression.CNFExprs
OtherFilter expression.CNFExprs
schema *expression.Schema
outer bool
defaultValues []types.Datum
}
// Schema implements Executor interface.
func (e *NestedLoopJoinExec) Schema() *expression.Schema {
return e.schema
}
// Close implements Executor interface.
func (e *NestedLoopJoinExec) Close() error {
e.resultRows = nil
e.innerRows = nil
return e.BigExec.Close()
}
// Open implements Executor Open interface.
func (e *NestedLoopJoinExec) Open() error {
e.cursor = 0
e.prepared = false
e.resultRows = e.resultRows[:0]
e.innerRows = e.innerRows[:0]
return errors.Trace(e.BigExec.Open())
}
func (e *NestedLoopJoinExec) fetchBigRow() (Row, bool, error) {
for {
bigRow, err := e.BigExec.Next()
if err != nil {
return nil, false, errors.Trace(err)
}
if bigRow == nil {
return nil, false, e.BigExec.Close()
}
matched, err := expression.EvalBool(e.BigFilter, bigRow, e.Ctx)
if err != nil {
return nil, false, errors.Trace(err)
}
if matched {
return bigRow, true, nil
} else if e.outer {
return bigRow, false, nil
}
}
}
// prepare runs the first time when 'Next' is called and it reads all data from the small table and stores
// them in a slice.
func (e *NestedLoopJoinExec) prepare() error {
err := e.SmallExec.Open()
if err != nil {
return errors.Trace(err)
}
defer terror.Call(e.SmallExec.Close)
e.innerRows = e.innerRows[:0]
e.prepared = true
for {
row, err := e.SmallExec.Next()
if err != nil {
return errors.Trace(err)
}
if row == nil {
return nil
}
matched, err := expression.EvalBool(e.SmallFilter, row, e.Ctx)
if err != nil {
return errors.Trace(err)
}
if matched {
e.innerRows = append(e.innerRows, row)
}
}
}
func (e *NestedLoopJoinExec) fillRowWithDefaultValue(bigRow Row) (returnRow Row) {
smallRow := make([]types.Datum, e.SmallExec.Schema().Len())
copy(smallRow, e.defaultValues)
if e.leftSmall {
returnRow = makeJoinRow(smallRow, bigRow)
} else {
returnRow = makeJoinRow(bigRow, smallRow)
}
return returnRow
}
func (e *NestedLoopJoinExec) doJoin(bigRow Row, match bool) ([]Row, error) {
e.resultRows = e.resultRows[0:0]
if !match && e.outer {
row := e.fillRowWithDefaultValue(bigRow)
e.resultRows = append(e.resultRows, row)
return e.resultRows, nil
}
for _, row := range e.innerRows {
var mergedRow Row
if e.leftSmall {
mergedRow = makeJoinRow(row, bigRow)
} else {
mergedRow = makeJoinRow(bigRow, row)
}
matched, err := expression.EvalBool(e.OtherFilter, mergedRow, e.Ctx)
if err != nil {
return nil, errors.Trace(err)
}
if !matched {
continue
}
e.resultRows = append(e.resultRows, mergedRow)
}
if len(e.resultRows) == 0 && e.outer {
e.resultRows = append(e.resultRows, e.fillRowWithDefaultValue(bigRow))
}
return e.resultRows, nil
}
// Next implements the Executor interface.
func (e *NestedLoopJoinExec) Next() (Row, error) {
if !e.prepared {
if err := e.prepare(); err != nil {
return nil, errors.Trace(err)
}
}
for {
if e.cursor < len(e.resultRows) {
retRow := e.resultRows[e.cursor]
e.cursor++
return retRow, nil
}
bigRow, match, err := e.fetchBigRow()
if bigRow == nil || err != nil {
return bigRow, errors.Trace(err)
}
e.resultRows, err = e.doJoin(bigRow, match)
if err != nil {
return nil, errors.Trace(err)
}
e.cursor = 0
}
}
// HashSemiJoinExec implements the hash join algorithm for semi join.
type HashSemiJoinExec struct {
hashTable map[string][]Row
smallHashKey []*expression.Column
bigHashKey []*expression.Column
smallExec Executor
bigExec Executor
prepared bool
ctx context.Context
smallFilter expression.CNFExprs
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
schema *expression.Schema
resultRows []Row
// auxMode is a mode that the result row always returns with an extra column which stores a boolean
// or NULL value to indicate if this row is matched.
auxMode bool
smallTableHasNull bool
// anti is true, semi join only output the unmatched row.
anti bool
}
// Close implements the Executor Close interface.
func (e *HashSemiJoinExec) Close() error {
e.hashTable = nil
e.resultRows = nil
return e.bigExec.Close()
}
// Open implements the Executor Open interface.
func (e *HashSemiJoinExec) Open() error {
e.prepared = false
e.smallTableHasNull = false
e.hashTable = make(map[string][]Row)
e.resultRows = make([]Row, 1)
return errors.Trace(e.bigExec.Open())
}
// Schema implements the Executor Schema interface.
func (e *HashSemiJoinExec) Schema() *expression.Schema {
return e.schema
}
// prepare runs the first time when 'Next' is called and it reads all data from the small table and stores
// them in a hash table.
func (e *HashSemiJoinExec) prepare() error {
err := e.smallExec.Open()
if err != nil {
return errors.Trace(err)
}
defer terror.Call(e.smallExec.Close)
e.hashTable = make(map[string][]Row)
e.resultRows = make([]Row, 1)
e.prepared = true
for {
row, err := e.smallExec.Next()
if err != nil {
return errors.Trace(err)
}
if row == nil {
return nil
}
matched, err := expression.EvalBool(e.smallFilter, row, e.ctx)
if err != nil {
return errors.Trace(err)
}
if !matched {
continue
}
hasNull, hashcode, err := getJoinKey(e.smallHashKey, row, make([]types.Datum, len(e.smallHashKey)), nil)
if err != nil {
return errors.Trace(err)
}
if hasNull {
e.smallTableHasNull = true
continue
}
if rows, ok := e.hashTable[string(hashcode)]; !ok {
e.hashTable[string(hashcode)] = []Row{row}
} else {
e.hashTable[string(hashcode)] = append(rows, row)
}
}
}
func (e *HashSemiJoinExec) rowIsMatched(bigRow Row) (matched bool, hasNull bool, err error) {
hasNull, hashcode, err := getJoinKey(e.bigHashKey, bigRow, make([]types.Datum, len(e.smallHashKey)), nil)
if err != nil {
return false, false, errors.Trace(err)
}
if hasNull {
return false, true, nil
}
rows, ok := e.hashTable[string(hashcode)]
if !ok {
return
}
// match eq condition
for _, smallRow := range rows {
matchedRow := makeJoinRow(bigRow, smallRow)
matched, err = expression.EvalBool(e.otherFilter, matchedRow, e.ctx)
if err != nil {
return false, false, errors.Trace(err)
}
if matched {
return
}
}
return
}
func (e *HashSemiJoinExec) fetchBigRow() (Row, bool, error) {
for {
bigRow, err := e.bigExec.Next()
if err != nil {
return nil, false, errors.Trace(err)
}
if bigRow == nil {
return nil, false, errors.Trace(e.bigExec.Close())
}
matched, err := expression.EvalBool(e.bigFilter, bigRow, e.ctx)
if err != nil {
return nil, false, errors.Trace(err)
}
if matched {
return bigRow, true, nil
} else if e.auxMode {
return bigRow, false, nil
}
}
}
func (e *HashSemiJoinExec) doJoin(bigRow Row, match bool) ([]Row, error) {
if e.auxMode && !match {
bigRow = append(bigRow, types.NewDatum(false))
e.resultRows[0] = bigRow
return e.resultRows, nil
}
matched, isNull, err := e.rowIsMatched(bigRow)
if err != nil {
return nil, errors.Trace(err)
}
if !matched && e.smallTableHasNull {
isNull = true
}
if e.anti && !isNull {
matched = !matched
}
// For the auxMode subquery, we return the row with a Datum indicating if it's a match,
// For the non-auxMode subquery, we return the matching row only.
if e.auxMode {
if isNull {
bigRow = append(bigRow, types.NewDatum(nil))
} else {
bigRow = append(bigRow, types.NewDatum(matched))
}
matched = true
}
if matched {
e.resultRows[0] = bigRow
return e.resultRows, nil
}
return nil, nil
}
// Next implements the Executor Next interface.
func (e *HashSemiJoinExec) Next() (Row, error) {
if !e.prepared {
if err := e.prepare(); err != nil {
return nil, errors.Trace(err)
}
}
for {
bigRow, match, err := e.fetchBigRow()
if bigRow == nil || err != nil {
return bigRow, errors.Trace(err)
}
resultRows, err := e.doJoin(bigRow, match)
if err != nil {
return nil, errors.Trace(err)
}
if len(resultRows) > 0 {
return resultRows[0], nil
}
}
}
// ApplyJoinExec is the new logic of apply.
type ApplyJoinExec struct {
join joinExec
outerSchema []*expression.CorrelatedColumn
cursor int
resultRows []Row
schema *expression.Schema
}
// Schema implements the Executor interface.
func (e *ApplyJoinExec) Schema() *expression.Schema {
return e.schema
}
// Close implements the Executor interface.
func (e *ApplyJoinExec) Close() error {
return nil
}
// Open implements the Executor interface.
func (e *ApplyJoinExec) Open() error {
e.cursor = 0
e.resultRows = nil
return errors.Trace(e.join.Open())
}
// Next implements the Executor interface.
func (e *ApplyJoinExec) Next() (Row, error) {
for {
if e.cursor < len(e.resultRows) {
row := e.resultRows[e.cursor]
e.cursor++
return row, nil
}
bigRow, match, err := e.join.fetchBigRow()
if bigRow == nil || err != nil {
return nil, errors.Trace(err)
}
for _, col := range e.outerSchema {
*col.Data = bigRow[col.Index]
}
err = e.join.prepare()
if err != nil {
return nil, errors.Trace(err)
}
e.resultRows, err = e.join.doJoin(bigRow, match)
if err != nil {
return nil, errors.Trace(err)
}
e.cursor = 0
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pingcap/tidb.git
git@gitee.com:pingcap/tidb.git
pingcap
tidb
tidb
v1.1.0-alpha

搜索帮助

344bd9b3 5694891 D2dac590 5694891