90 Star 486 Fork 146

平凯星辰(北京)科技有限公司 / tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
request_builder.go 10.74 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package distsql
import (
"math"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
tipb "github.com/pingcap/tipb/go-tipb"
)
// RequestBuilder is used to build a "kv.Request".
// It is called before we issue a kv request by "Select".
type RequestBuilder struct {
kv.Request
err error
}
// Build builds a "kv.Request".
func (builder *RequestBuilder) Build() (*kv.Request, error) {
return &builder.Request, errors.Trace(builder.err)
}
// SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges"
// to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.NewRange, fb *statistics.QueryFeedback) *RequestBuilder {
if builder.err != nil {
return builder
}
builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb)
return builder
}
// SetIndexRanges sets "KeyRanges" for "kv.Request" by converting index range
// "ranges" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetIndexRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.NewRange) *RequestBuilder {
if builder.err != nil {
return builder
}
builder.Request.KeyRanges, builder.err = IndexRangesToKVRanges(sc, tid, idxID, ranges, nil)
return builder
}
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []int64) *RequestBuilder {
builder.Request.KeyRanges = TableHandlesToKVRanges(tid, handles)
return builder
}
// SetDAGRequest sets the request type to "ReqTypeDAG" and cosntruct request data.
func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuilder {
if builder.err != nil {
return builder
}
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.StartTs = dag.StartTs
builder.Request.Data, builder.err = dag.Marshal()
return builder
}
// SetAnalyzeRequest sets the request type to "ReqTypeAnalyze" and cosntruct request data.
func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestBuilder {
if builder.err != nil {
return builder
}
builder.Request.Tp = kv.ReqTypeAnalyze
builder.Request.StartTs = ana.StartTs
builder.Request.Data, builder.err = ana.Marshal()
builder.Request.NotFillCache = true
builder.Request.IsolationLevel = kv.RC
builder.Request.Priority = kv.PriorityLow
return builder
}
// SetChecksumRequest sets the request type to "ReqTypeChecksum" and construct request data.
func (builder *RequestBuilder) SetChecksumRequest(checksum *tipb.ChecksumRequest) *RequestBuilder {
if builder.err != nil {
return builder
}
builder.Request.Tp = kv.ReqTypeChecksum
builder.Request.StartTs = checksum.StartTs
builder.Request.Data, builder.err = checksum.Marshal()
builder.Request.NotFillCache = true
return builder
}
// SetKeyRanges sets "KeyRanges" for "kv.Request".
func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBuilder {
builder.Request.KeyRanges = keyRanges
return builder
}
// SetDesc sets "Desc" for "kv.Request".
func (builder *RequestBuilder) SetDesc(desc bool) *RequestBuilder {
builder.Request.Desc = desc
return builder
}
// SetKeepOrder sets "KeepOrder" for "kv.Request".
func (builder *RequestBuilder) SetKeepOrder(order bool) *RequestBuilder {
builder.Request.KeepOrder = order
return builder
}
func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
switch builder.Tp {
case kv.ReqTypeAnalyze:
return kv.RC
}
return kv.SI
}
// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
return builder
}
// SetPriority sets "Priority" for "kv.Request".
func (builder *RequestBuilder) SetPriority(priority int) *RequestBuilder {
builder.Request.Priority = priority
return builder
}
// SetStreaming sets "Streaming" flag for "kv.Request".
func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
builder.Request.Streaming = streaming
return builder
}
// SetConcurrency sets "Concurrency" for "kv.Request".
func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder {
builder.Request.Concurrency = concurrency
return builder
}
// TableRangesToKVRanges converts table ranges to "KeyRange".
func TableRangesToKVRanges(tid int64, ranges []*ranger.NewRange, fb *statistics.QueryFeedback) []kv.KeyRange {
if fb == nil || fb.Hist() == nil {
return tableRangesToKVRangesWithoutSplit(tid, ranges)
}
ranges = fb.Hist().SplitRange(ranges)
krs := make([]kv.KeyRange, 0, len(ranges))
feedbackRanges := make([]*ranger.NewRange, 0, len(ranges))
for _, ran := range ranges {
low := codec.EncodeInt(nil, ran.LowVal[0].GetInt64())
high := codec.EncodeInt(nil, ran.HighVal[0].GetInt64())
if ran.LowExclude {
low = []byte(kv.Key(low).PrefixNext())
}
// If this range is split by histogram, then the high val will equal to one bucket's upper bound,
// since we need to guarantee each range falls inside the exactly one bucket, `PerfixNext` will make the
// high value greater than upper bound, so we store the range here.
r := &ranger.NewRange{LowVal: []types.Datum{types.NewBytesDatum(low)},
HighVal: []types.Datum{types.NewBytesDatum(high)}}
feedbackRanges = append(feedbackRanges, r)
if !ran.HighExclude {
high = []byte(kv.Key(high).PrefixNext())
}
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
fb.StoreRanges(feedbackRanges)
return krs
}
func tableRangesToKVRangesWithoutSplit(tid int64, ranges []*ranger.NewRange) []kv.KeyRange {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
low, high := encodeHandleKey(ran)
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
return krs
}
func encodeHandleKey(ran *ranger.NewRange) ([]byte, []byte) {
low := codec.EncodeInt(nil, ran.LowVal[0].GetInt64())
high := codec.EncodeInt(nil, ran.HighVal[0].GetInt64())
if ran.LowExclude {
low = []byte(kv.Key(low).PrefixNext())
}
if !ran.HighExclude {
high = []byte(kv.Key(high).PrefixNext())
}
return low, high
}
// TableHandlesToKVRanges converts sorted handle to kv ranges.
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []int64) []kv.KeyRange {
krs := make([]kv.KeyRange, 0, len(handles))
i := 0
for i < len(handles) {
j := i + 1
for ; j < len(handles) && handles[j-1] != math.MaxInt64; j++ {
if handles[j] != handles[j-1]+1 {
break
}
}
low := codec.EncodeInt(nil, handles[i])
high := codec.EncodeInt(nil, handles[j-1])
high = []byte(kv.Key(high).PrefixNext())
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
i = j
}
return krs
}
// IndexRangesToKVRanges converts index ranges to "KeyRange".
func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.NewRange, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
if fb == nil || fb.Hist() == nil {
return indexRangesToKVWithoutSplit(sc, tid, idxID, ranges)
}
feedbackRanges := make([]*ranger.NewRange, 0, len(ranges))
for _, ran := range ranges {
low, high, err := encodeIndexKey(sc, ran)
if err != nil {
return nil, errors.Trace(err)
}
feedbackRanges = append(feedbackRanges, &ranger.NewRange{LowVal: []types.Datum{types.NewBytesDatum(low)},
HighVal: []types.Datum{types.NewBytesDatum(high)}, LowExclude: false, HighExclude: true})
}
feedbackRanges = fb.Hist().SplitRange(feedbackRanges)
krs := make([]kv.KeyRange, 0, len(feedbackRanges))
for _, ran := range feedbackRanges {
low, high := ran.LowVal[0].GetBytes(), ran.HighVal[0].GetBytes()
if ran.LowExclude {
low = kv.Key(low).PrefixNext()
}
ran.LowVal[0].SetBytes(low)
// If this range is split by histogram, then the high val will equal to one bucket's upper bound,
// since we need to guarantee each range falls inside the exactly one bucket, `PerfixNext` will make the
// high value greater than upper bound, so we store the high value here.
ran.HighVal[0].SetBytes(high)
if !ran.HighExclude {
high = kv.Key(high).PrefixNext()
}
startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low)
endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
fb.StoreRanges(feedbackRanges)
return krs, nil
}
func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.NewRange) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
low, high, err := encodeIndexKey(sc, ran)
if err != nil {
return nil, errors.Trace(err)
}
startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low)
endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
return krs, nil
}
func encodeIndexKey(sc *stmtctx.StatementContext, ran *ranger.NewRange) ([]byte, []byte, error) {
low, err := codec.EncodeKey(sc, nil, ran.LowVal...)
if err != nil {
return nil, nil, errors.Trace(err)
}
if ran.LowExclude {
low = []byte(kv.Key(low).PrefixNext())
}
high, err := codec.EncodeKey(sc, nil, ran.HighVal...)
if err != nil {
return nil, nil, errors.Trace(err)
}
if !ran.HighExclude {
high = []byte(kv.Key(high).PrefixNext())
}
var hasNull bool
for _, highVal := range ran.HighVal {
if highVal.IsNull() {
hasNull = true
break
}
}
if hasNull {
// Append 0 to make unique-key range [null, null] to be a scan rather than point-get.
high = []byte(kv.Key(high).Next())
}
return low, high, nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pingcap/tidb.git
git@gitee.com:pingcap/tidb.git
pingcap
tidb
tidb
v2.0.10

搜索帮助

344bd9b3 5694891 D2dac590 5694891