90 Star 492 Fork 151

平凯星辰(北京)科技有限公司/tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
local_region.go 20.11 KB
一键复制 编辑 原始数据 按行查看 历史
coocood 提交于 2017-10-09 12:46 . *: check errors part 2 (#4704)

package localstore
import (
"bytes"
"container/heap"
"encoding/binary"
"sort"
"time"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/distsql/xeval"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
)
// local region server.
type localRegion struct {
id int
store *dbStore
startKey []byte
endKey []byte
}
type regionRequest struct {
Tp int64
data []byte
startKey []byte
endKey []byte
ranges []kv.KeyRange
}
type regionResponse struct {
req *regionRequest
err error
data []byte
// If region missed some request key range, newStartKey and newEndKey is returned.
newStartKey []byte
newEndKey []byte
}
const chunkSize = 64
type sortRow struct {
key []types.Datum
meta tipb.RowMeta
data []byte
}
// topnSorter implements sort.Interface. When all rows have been processed, the topnSorter will sort the whole data in heap.
type topnSorter struct {
orderByItems []*tipb.ByItem
rows []*sortRow
err error
ctx *selectContext
}
func (t *topnSorter) Len() int {
return len(t.rows)
}
func (t *topnSorter) Swap(i, j int) {
t.rows[i], t.rows[j] = t.rows[j], t.rows[i]
}
func (t *topnSorter) Less(i, j int) bool {
for index, by := range t.orderByItems {
v1 := t.rows[i].key[index]
v2 := t.rows[j].key[index]
ret, err := v1.CompareDatum(t.ctx.sc, &v2)
if err != nil {
t.err = errors.Trace(err)
return true
}
if by.Desc {
ret = -ret
}
if ret < 0 {
return true
} else if ret > 0 {
return false
}
}
return false
}
// topnHeap holds the top n elements using heap structure. It implements heap.Interface.
// When we insert a row, topnHeap will check if the row can become one of the top n element or not.
type topnHeap struct {
topnSorter
// totalCount is equal to the limit count, which means the max size of heap.
totalCount int
// heapSize means the current size of this heap.
heapSize int
}
func (t *topnHeap) Len() int {
return t.heapSize
}
func (t *topnHeap) Push(x interface{}) {
t.rows = append(t.rows, x.(*sortRow))
t.heapSize++
}
func (t *topnHeap) Pop() interface{} {
return nil
}
func (t *topnHeap) Less(i, j int) bool {
for index, by := range t.orderByItems {
v1 := t.rows[i].key[index]
v2 := t.rows[j].key[index]
ret, err := v1.CompareDatum(t.ctx.sc, &v2)
if err != nil {
t.err = errors.Trace(err)
return true
}
if by.Desc {
ret = -ret
}
if ret > 0 {
return true
} else if ret < 0 {
return false
}
}
return false
}
// tryToAddRow tries to add a row to heap.
// When this row is not less than any rows in heap, it will never become the top n element.
// Then this function returns false.
func (t *topnHeap) tryToAddRow(row *sortRow) bool {
success := false
if t.heapSize == t.totalCount {
t.rows = append(t.rows, row)
// When this row is less than the top element, it will replace it and adjust the heap structure.
if t.Less(0, t.heapSize) {
t.Swap(0, t.heapSize)
heap.Fix(t, 0)
success = true
}
t.rows = t.rows[:t.heapSize]
} else {
heap.Push(t, row)
success = true
}
return success
}
type selectContext struct {
sel *tipb.SelectRequest
txn kv.Transaction
eval *xeval.Evaluator
whereColumns map[int64]*tipb.ColumnInfo
aggColumns map[int64]*tipb.ColumnInfo
topnColumns map[int64]*tipb.ColumnInfo
groups map[string]bool
groupKeys [][]byte
aggregates []*aggregateFuncExpr
topnHeap *topnHeap
keyRanges []kv.KeyRange
// TODO: Only one of these three flags can be true at the same time. We should set this as an enum var.
aggregate bool
descScan bool
topN bool
// Use for DecodeRow.
colTps map[int64]*types.FieldType
chunks []tipb.Chunk
sc *variable.StatementContext
}
func (rs *localRegion) Handle(req *regionRequest) (*regionResponse, error) {
resp := &regionResponse{
req: req,
}
if req.Tp == kv.ReqTypeSelect || req.Tp == kv.ReqTypeIndex {
sel := new(tipb.SelectRequest)
err := proto.Unmarshal(req.data, sel)
if err != nil {
return nil, errors.Trace(err)
}
txn := newTxn(rs.store, kv.Version{Ver: sel.StartTs})
ctx := &selectContext{
sel: sel,
txn: txn,
keyRanges: req.ranges,
sc: xeval.FlagsToStatementContext(sel.Flags),
}
loc := time.FixedZone("UTC", int(sel.TimeZoneOffset))
ctx.eval = xeval.NewEvaluator(ctx.sc, loc)
if sel.Where != nil {
ctx.whereColumns = make(map[int64]*tipb.ColumnInfo)
err = collectColumnsInExpr(sel.Where, ctx, ctx.whereColumns)
if err != nil {
return nil, errors.Trace(err)
}
}
if len(sel.OrderBy) > 0 {
if sel.OrderBy[0].Expr == nil {
ctx.descScan = sel.OrderBy[0].Desc
} else {
if sel.Limit == nil {
return nil, errors.New("we don't support pushing down Sort without Limit")
}
ctx.topN = true
ctx.topnHeap = &topnHeap{
totalCount: int(*sel.Limit),
topnSorter: topnSorter{
orderByItems: sel.OrderBy,
ctx: ctx,
},
}
ctx.topnColumns = make(map[int64]*tipb.ColumnInfo)
for _, item := range sel.OrderBy {
err = collectColumnsInExpr(item.Expr, ctx, ctx.topnColumns)
if err != nil {
return nil, errors.Trace(err)
}
}
for k := range ctx.whereColumns {
// It will be handled in where.
delete(ctx.topnColumns, k)
}
}
}
ctx.aggregate = len(sel.Aggregates) > 0 || len(sel.GetGroupBy()) > 0
if ctx.aggregate {
// compose aggregateFuncExpr
ctx.aggregates = make([]*aggregateFuncExpr, 0, len(sel.Aggregates))
ctx.aggColumns = make(map[int64]*tipb.ColumnInfo)
for _, agg := range sel.Aggregates {
aggExpr := &aggregateFuncExpr{expr: agg}
ctx.aggregates = append(ctx.aggregates, aggExpr)
err = collectColumnsInExpr(agg, ctx, ctx.aggColumns)
if err != nil {
return nil, errors.Trace(err)
}
}
ctx.groups = make(map[string]bool)
ctx.groupKeys = make([][]byte, 0)
for _, item := range ctx.sel.GetGroupBy() {
err = collectColumnsInExpr(item.Expr, ctx, ctx.aggColumns)
if err != nil {
return nil, errors.Trace(err)
}
}
for k := range ctx.whereColumns {
// It will be handled in where.
delete(ctx.aggColumns, k)
}
}
if req.Tp == kv.ReqTypeSelect {
err = rs.getRowsFromSelectReq(ctx)
} else {
// The PKHandle column info has been collected in ctx, so we can remove it in IndexInfo.
length := len(sel.IndexInfo.Columns)
if sel.IndexInfo.Columns[length-1].GetPkHandle() {
sel.IndexInfo.Columns = sel.IndexInfo.Columns[:length-1]
}
err = rs.getRowsFromIndexReq(ctx)
}
if ctx.topN {
rs.setTopNDataForCtx(ctx)
}
selResp := new(tipb.SelectResponse)
selResp.Error = toPBError(err)
selResp.Chunks = ctx.chunks
resp.err = err
data, err := proto.Marshal(selResp)
if err != nil {
return nil, errors.Trace(err)
}
resp.data = data
}
if bytes.Compare(rs.startKey, req.startKey) < 0 || bytes.Compare(rs.endKey, req.endKey) > 0 {
resp.newStartKey = rs.startKey
resp.newEndKey = rs.endKey
}
return resp, nil
}
func (rs *localRegion) setTopNDataForCtx(ctx *selectContext) {
sort.Sort(&ctx.topnHeap.topnSorter)
for _, row := range ctx.topnHeap.rows {
chunk := rs.getChunk(ctx)
chunk.RowsData = append(chunk.RowsData, row.data...)
chunk.RowsMeta = append(chunk.RowsMeta, row.meta)
}
}
func collectColumnsInExpr(expr *tipb.Expr, ctx *selectContext, collector map[int64]*tipb.ColumnInfo) error {
if expr == nil {
return nil
}
if expr.GetTp() == tipb.ExprType_ColumnRef {
_, i, err := codec.DecodeInt(expr.Val)
if err != nil {
return errors.Trace(err)
}
var columns []*tipb.ColumnInfo
if ctx.sel.TableInfo != nil {
columns = ctx.sel.TableInfo.Columns
} else {
columns = ctx.sel.IndexInfo.Columns
}
for _, c := range columns {
if c.GetColumnId() == i {
collector[i] = c
return nil
}
}
return xeval.ErrInvalid.Gen("column %d not found", i)
}
for _, child := range expr.Children {
err := collectColumnsInExpr(child, ctx, collector)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
func (rs *localRegion) getRowsFromSelectReq(ctx *selectContext) error {
// Init ctx.colTps and use it to decode all the rows.
columns := ctx.sel.TableInfo.Columns
ctx.colTps = make(map[int64]*types.FieldType, len(columns))
for _, col := range columns {
if col.GetPkHandle() {
continue
}
ctx.colTps[col.GetColumnId()] = distsql.FieldTypeFromPBColumn(col)
}
kvRanges := rs.extractKVRanges(ctx)
limit := int64(-1)
if ctx.sel.Limit != nil {
limit = ctx.sel.GetLimit()
}
for _, ran := range kvRanges {
if limit == 0 {
break
}
count, err := rs.getRowsFromRange(ctx, ran, limit, ctx.descScan)
if err != nil {
return errors.Trace(err)
}
limit -= count
}
if ctx.aggregate {
return rs.getRowsFromAgg(ctx)
}
return nil
}
/*
* Convert aggregate partial result to rows.
* Data layout example:
* SQL: select count(c1), sum(c2), avg(c3) from t;
* Aggs: count(c1), sum(c2), avg(c3)
* Rows: groupKey1, count1, value2, count3, value3
* groupKey2, count1, value2, count3, value3
*/
func (rs *localRegion) getRowsFromAgg(ctx *selectContext) error {
for _, gk := range ctx.groupKeys {
chunk := rs.getChunk(ctx)
// Each aggregate partial result will be converted to one or two datums.
rowData := make([]types.Datum, 0, 1+2*len(ctx.aggregates))
// The first column is group key.
rowData = append(rowData, types.NewBytesDatum(gk))
for _, agg := range ctx.aggregates {
agg.currentGroup = gk
ds, err := agg.toDatums(ctx)
if err != nil {
return errors.Trace(err)
}
rowData = append(rowData, ds...)
}
var err error
beforeLen := len(chunk.RowsData)
chunk.RowsData, err = codec.EncodeValue(chunk.RowsData, rowData...)
if err != nil {
return errors.Trace(err)
}
var rowMeta tipb.RowMeta
rowMeta.Length = int64(len(chunk.RowsData) - beforeLen)
chunk.RowsMeta = append(chunk.RowsMeta, rowMeta)
}
return nil
}
// extractKVRanges extracts kv.KeyRanges slice from ctx.keyRanges, and also returns if it is in descending order.
func (rs *localRegion) extractKVRanges(ctx *selectContext) (kvRanges []kv.KeyRange) {
for _, kran := range ctx.keyRanges {
upperKey := kran.EndKey
if bytes.Compare(upperKey, rs.startKey) <= 0 {
continue
}
lowerKey := kran.StartKey
if bytes.Compare(lowerKey, rs.endKey) >= 0 {
break
}
var kvr kv.KeyRange
if bytes.Compare(lowerKey, rs.startKey) <= 0 {
kvr.StartKey = rs.startKey
} else {
kvr.StartKey = lowerKey
}
if bytes.Compare(upperKey, rs.endKey) <= 0 {
kvr.EndKey = upperKey
} else {
kvr.EndKey = rs.endKey
}
kvRanges = append(kvRanges, kvr)
}
if ctx.descScan {
reverseKVRanges(kvRanges)
}
return
}
func (rs *localRegion) getRowsFromRange(ctx *selectContext, ran kv.KeyRange, limit int64, desc bool) (count int64, err error) {
if limit == 0 {
return 0, nil
}
if ran.IsPoint() {
var value []byte
value, err = ctx.txn.Get(ran.StartKey)
if kv.ErrNotExist.Equal(err) {
return 0, nil
} else if err != nil {
return 0, errors.Trace(err)
}
var h int64
h, err = tablecodec.DecodeRowKey(ran.StartKey)
if err != nil {
return 0, errors.Trace(err)
}
gotRow, err1 := rs.handleRowData(ctx, h, value)
if err1 != nil {
return 0, errors.Trace(err1)
}
if gotRow {
count++
}
return
}
var seekKey kv.Key
if desc {
seekKey = ran.EndKey
} else {
seekKey = ran.StartKey
}
for {
if limit == 0 {
break
}
var (
it kv.Iterator
err error
)
if desc {
it, err = ctx.txn.SeekReverse(seekKey)
} else {
it, err = ctx.txn.Seek(seekKey)
}
if err != nil {
return 0, errors.Trace(err)
}
if !it.Valid() {
break
}
if desc {
if it.Key().Cmp(ran.StartKey) < 0 {
break
}
seekKey = tablecodec.TruncateToRowKeyLen(it.Key())
} else {
if it.Key().Cmp(ran.EndKey) >= 0 {
break
}
seekKey = it.Key().PrefixNext()
}
h, err := tablecodec.DecodeRowKey(it.Key())
if err != nil {
return 0, errors.Trace(err)
}
gotRow, err := rs.handleRowData(ctx, h, it.Value())
if err != nil {
return 0, errors.Trace(err)
}
if gotRow {
limit--
count++
}
}
return count, nil
}
// handleRowData deals with raw row data:
// 1. Decodes row from raw byte slice.
// 2. Checks if it fit where condition.
// 3. Update aggregate functions.
// returns true if got a row.
func (rs *localRegion) handleRowData(ctx *selectContext, handle int64, value []byte) (bool, error) {
columns := ctx.sel.TableInfo.Columns
values, err := rs.getRowData(value, ctx.colTps)
if err != nil {
return false, errors.Trace(err)
}
// Fill handle and null columns.
for _, col := range columns {
if col.GetPkHandle() {
var handleDatum types.Datum
if mysql.HasUnsignedFlag(uint(col.Flag)) {
// PK column is Unsigned
handleDatum = types.NewUintDatum(uint64(handle))
} else {
handleDatum = types.NewIntDatum(handle)
}
handleData, err1 := codec.EncodeValue(nil, handleDatum)
if err1 != nil {
return false, errors.Trace(err1)
}
values[col.GetColumnId()] = handleData
} else {
_, ok := values[col.GetColumnId()]
if ok {
continue
}
if len(col.DefaultVal) > 0 {
values[col.GetColumnId()] = col.DefaultVal
continue
}
if mysql.HasNotNullFlag(uint(col.Flag)) {
return false, errors.New("Miss column")
}
values[col.GetColumnId()] = []byte{codec.NilFlag}
}
}
return rs.valuesToRow(ctx, handle, values)
}
// evalTopN evaluates the top n elements from the data. The input receives a record including its handle and data.
// And this function will check if this record can replace one of the old records.
func (rs *localRegion) evalTopN(ctx *selectContext, handle int64, values map[int64][]byte, columns []*tipb.ColumnInfo) error {
err := rs.setColumnValueToCtx(ctx, handle, values, ctx.topnColumns)
if err != nil {
return errors.Trace(err)
}
newRow := &sortRow{
meta: tipb.RowMeta{Handle: handle},
}
for _, item := range ctx.topnHeap.orderByItems {
result, err := ctx.eval.Eval(item.Expr)
if err != nil {
return errors.Trace(err)
}
newRow.key = append(newRow.key, result)
}
if ctx.topnHeap.tryToAddRow(newRow) {
for _, col := range columns {
val := values[col.GetColumnId()]
newRow.data = append(newRow.data, val...)
newRow.meta.Length += int64(len(val))
}
}
return errors.Trace(ctx.topnHeap.err)
}
func (rs *localRegion) valuesToRow(ctx *selectContext, handle int64, values map[int64][]byte) (bool, error) {
var columns []*tipb.ColumnInfo
if ctx.sel.TableInfo != nil {
columns = ctx.sel.TableInfo.Columns
} else {
columns = ctx.sel.IndexInfo.Columns
}
// Evaluate where
match, err := rs.evalWhereForRow(ctx, handle, values)
if err != nil {
return false, errors.Trace(err)
}
if !match {
return false, nil
}
if ctx.topN {
return false, errors.Trace(rs.evalTopN(ctx, handle, values, columns))
}
if ctx.aggregate {
// Update aggregate functions.
err = rs.aggregate(ctx, handle, values)
if err != nil {
return false, errors.Trace(err)
}
return false, nil
}
chunk := rs.getChunk(ctx)
var rowMeta tipb.RowMeta
rowMeta.Handle = handle
// If without aggregate functions, just return raw row data.
for _, col := range columns {
val := values[col.GetColumnId()]
rowMeta.Length += int64(len(val))
chunk.RowsData = append(chunk.RowsData, val...)
}
chunk.RowsMeta = append(chunk.RowsMeta, rowMeta)
return true, nil
}
func (rs *localRegion) getChunk(ctx *selectContext) *tipb.Chunk {
chunkLen := len(ctx.chunks)
if chunkLen == 0 || len(ctx.chunks[chunkLen-1].RowsMeta) >= chunkSize {
newChunk := tipb.Chunk{}
ctx.chunks = append(ctx.chunks, newChunk)
}
return &ctx.chunks[len(ctx.chunks)-1]
}
func (rs *localRegion) getRowData(value []byte, colTps map[int64]*types.FieldType) (map[int64][]byte, error) {
res, err := tablecodec.CutRow(value, colTps)
if err != nil {
return nil, errors.Trace(err)
}
if res == nil {
res = make(map[int64][]byte, len(colTps))
}
return res, nil
}
// Put column values into ctx, the values will be used for expr evaluation.
func (rs *localRegion) setColumnValueToCtx(ctx *selectContext, h int64, row map[int64][]byte, cols map[int64]*tipb.ColumnInfo) error {
for colID, col := range cols {
if col.GetPkHandle() {
if mysql.HasUnsignedFlag(uint(col.GetFlag())) {
ctx.eval.Row[colID] = types.NewUintDatum(uint64(h))
} else {
ctx.eval.Row[colID] = types.NewIntDatum(h)
}
} else {
data := row[colID]
ft := distsql.FieldTypeFromPBColumn(col)
datum, err := tablecodec.DecodeColumnValue(data, ft, ctx.eval.TimeZone)
if err != nil {
return errors.Trace(err)
}
ctx.eval.Row[colID] = datum
}
}
return nil
}
func (rs *localRegion) evalWhereForRow(ctx *selectContext, h int64, row map[int64][]byte) (bool, error) {
if ctx.sel.Where == nil {
return true, nil
}
err := rs.setColumnValueToCtx(ctx, h, row, ctx.whereColumns)
if err != nil {
return false, errors.Trace(err)
}
result, err := ctx.eval.Eval(ctx.sel.Where)
if err != nil {
return false, errors.Trace(err)
}
if result.IsNull() {
return false, nil
}
boolResult, err := result.ToBool(ctx.sc)
if err != nil {
return false, errors.Trace(err)
}
return boolResult == 1, nil
}
func toPBError(err error) *tipb.Error {
if err == nil {
return nil
}
perr := new(tipb.Error)
code := int32(1)
perr.Code = code
errStr := err.Error()
perr.Msg = errStr
return perr
}
func (rs *localRegion) getRowsFromIndexReq(ctx *selectContext) error {
kvRanges := rs.extractKVRanges(ctx)
limit := int64(-1)
if ctx.sel.Limit != nil {
limit = ctx.sel.GetLimit()
}
for _, ran := range kvRanges {
if limit == 0 {
break
}
count, err := rs.getIndexRowFromRange(ctx, ran, ctx.descScan, limit)
if err != nil {
return errors.Trace(err)
}
limit -= count
}
if ctx.aggregate {
return rs.getRowsFromAgg(ctx)
}
return nil
}
func reverseKVRanges(kvRanges []kv.KeyRange) {
for i := 0; i < len(kvRanges)/2; i++ {
j := len(kvRanges) - i - 1
kvRanges[i], kvRanges[j] = kvRanges[j], kvRanges[i]
}
}
func (rs *localRegion) getIndexRowFromRange(ctx *selectContext, ran kv.KeyRange, desc bool, limit int64) (count int64, err error) {
idxInfo := ctx.sel.IndexInfo
txn := ctx.txn
var seekKey kv.Key
if desc {
seekKey = ran.EndKey
} else {
seekKey = ran.StartKey
}
ids := make([]int64, len(idxInfo.Columns))
for i, col := range idxInfo.Columns {
ids[i] = col.GetColumnId()
}
for {
if limit == 0 {
break
}
var it kv.Iterator
if desc {
it, err = txn.SeekReverse(seekKey)
if err != nil {
return 0, errors.Trace(err)
}
seekKey = it.Key()
} else {
it, err = txn.Seek(seekKey)
if err != nil {
return 0, errors.Trace(err)
}
seekKey = it.Key().PrefixNext()
}
if !it.Valid() {
break
}
if desc {
if it.Key().Cmp(ran.StartKey) < 0 {
break
}
} else {
if it.Key().Cmp(ran.EndKey) >= 0 {
break
}
}
values, b, err1 := tablecodec.CutIndexKey(it.Key(), ids)
if err1 != nil {
return 0, errors.Trace(err1)
}
var handle int64
if len(b) > 0 {
var handleDatum types.Datum
_, handleDatum, err = codec.DecodeOne(b)
if err != nil {
return 0, errors.Trace(err)
}
handle = handleDatum.GetInt64()
} else {
handle, err = decodeHandle(it.Value())
if err != nil {
return 0, errors.Trace(err)
}
}
gotRow, err := rs.valuesToRow(ctx, handle, values)
if err != nil {
return 0, errors.Trace(err)
}
if gotRow {
limit--
count++
}
}
return count, nil
}
func decodeHandle(data []byte) (int64, error) {
var h int64
buf := bytes.NewBuffer(data)
err := binary.Read(buf, binary.BigEndian, &h)
return h, errors.Trace(err)
}
func buildLocalRegionServers(store *dbStore) []*localRegion {
return []*localRegion{
{
id: 1,
store: store,
startKey: []byte(""),
endKey: []byte("t"),
},
{
id: 2,
store: store,
startKey: []byte("t"),
endKey: []byte("u"),
},
{
id: 3,
store: store,
startKey: []byte("u"),
endKey: []byte("z"),
},
}
}
func isDefaultNull(err error, col *tipb.ColumnInfo) bool {
return kv.ErrNotExist.Equal(err) && !mysql.HasNotNullFlag(uint(col.GetFlag()))
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pingcap/tidb.git
git@gitee.com:pingcap/tidb.git
pingcap
tidb
tidb
v1.0.7

搜索帮助