1 Star 0 Fork 0

zhoujin826/tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
coprocessor.go 13.75 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
import (
"bytes"
"fmt"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)
var copIteratorGP = gp.New(time.Minute)
// CopClient is coprocessor client.
type CopClient struct {
store *tikvStore
}
// IsRequestTypeSupported checks whether reqType is supported.
func (c *CopClient) IsRequestTypeSupported(reqType, subType int64) bool {
switch reqType {
case kv.ReqTypeSelect, kv.ReqTypeIndex:
switch subType {
case kv.ReqSubTypeGroupBy, kv.ReqSubTypeBasic, kv.ReqSubTypeTopN:
return true
default:
return c.supportExpr(tipb.ExprType(subType))
}
case kv.ReqTypeDAG:
return c.supportExpr(tipb.ExprType(subType))
case kv.ReqTypeAnalyze:
return true
}
return false
}
func (c *CopClient) supportExpr(exprType tipb.ExprType) bool {
switch exprType {
case tipb.ExprType_Null, tipb.ExprType_Int64, tipb.ExprType_Uint64, tipb.ExprType_String, tipb.ExprType_Bytes,
tipb.ExprType_MysqlDuration, tipb.ExprType_MysqlTime, tipb.ExprType_MysqlDecimal,
tipb.ExprType_Float32, tipb.ExprType_Float64, tipb.ExprType_ColumnRef:
return true
// logic operators.
case tipb.ExprType_And, tipb.ExprType_Or, tipb.ExprType_Not:
return true
// compare operators.
case tipb.ExprType_LT, tipb.ExprType_LE, tipb.ExprType_EQ, tipb.ExprType_NE,
tipb.ExprType_GE, tipb.ExprType_GT, tipb.ExprType_NullEQ,
tipb.ExprType_In, tipb.ExprType_ValueList, tipb.ExprType_IsNull,
tipb.ExprType_Like:
return true
// arithmetic operators.
case tipb.ExprType_Plus, tipb.ExprType_Div, tipb.ExprType_Minus, tipb.ExprType_Mul:
return true
// control functions
case tipb.ExprType_Case, tipb.ExprType_If, tipb.ExprType_IfNull, tipb.ExprType_Coalesce:
return true
// aggregate functions.
case tipb.ExprType_Count, tipb.ExprType_First, tipb.ExprType_Max, tipb.ExprType_Min, tipb.ExprType_Sum, tipb.ExprType_Avg:
return true
// json functions.
case tipb.ExprType_JsonType, tipb.ExprType_JsonExtract, tipb.ExprType_JsonUnquote,
tipb.ExprType_JsonObject, tipb.ExprType_JsonArray, tipb.ExprType_JsonMerge,
tipb.ExprType_JsonSet, tipb.ExprType_JsonInsert, tipb.ExprType_JsonReplace, tipb.ExprType_JsonRemove:
return true
case kv.ReqSubTypeDesc:
return true
case kv.ReqSubTypeSignature:
return true
default:
return false
}
}
// Send builds the request and gets the coprocessor iterator response.
func (c *CopClient) Send(ctx goctx.Context, req *kv.Request) kv.Response {
coprocessorCounter.WithLabelValues("send").Inc()
bo := NewBackoffer(copBuildTaskMaxBackoff, ctx)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req.Desc)
if err != nil {
return copErrorResponse{err}
}
it := &copIterator{
store: c.store,
req: req,
concurrency: req.Concurrency,
finished: make(chan struct{}),
}
it.tasks = tasks
if it.concurrency > len(tasks) {
it.concurrency = len(tasks)
}
if it.concurrency < 1 {
// Make sure that there is at least one worker.
it.concurrency = 1
}
if !it.req.KeepOrder {
it.respChan = make(chan copResponse, it.concurrency)
}
it.run(ctx)
return it
}
// copTask contains a related Region and KeyRange for a kv.Request.
type copTask struct {
region RegionVerID
ranges *copRanges
respChan chan copResponse
storeAddr string
}
func (r *copTask) String() string {
return fmt.Sprintf("region(%d %d %d) ranges(%d) store(%s)",
r.region.id, r.region.confVer, r.region.ver, r.ranges.len(), r.storeAddr)
}
// copRanges is like []kv.KeyRange, but may has extra elements at head/tail.
// It's for avoiding alloc big slice during build copTask.
type copRanges struct {
first *kv.KeyRange
mid []kv.KeyRange
last *kv.KeyRange
}
func (r *copRanges) String() string {
var s string
r.do(func(ran *kv.KeyRange) {
s += fmt.Sprintf("[%q, %q]", ran.StartKey, ran.EndKey)
})
return s
}
func (r *copRanges) len() int {
var l int
if r.first != nil {
l++
}
l += len(r.mid)
if r.last != nil {
l++
}
return l
}
func (r *copRanges) at(i int) kv.KeyRange {
if r.first != nil {
if i == 0 {
return *r.first
}
i--
}
if i < len(r.mid) {
return r.mid[i]
}
return *r.last
}
func (r *copRanges) slice(from, to int) *copRanges {
var ran copRanges
if r.first != nil {
if from == 0 && to > 0 {
ran.first = r.first
}
if from > 0 {
from--
}
if to > 0 {
to--
}
}
if to <= len(r.mid) {
ran.mid = r.mid[from:to]
} else {
if from <= len(r.mid) {
ran.mid = r.mid[from:]
}
if from < to {
ran.last = r.last
}
}
return &ran
}
func (r *copRanges) do(f func(ran *kv.KeyRange)) {
if r.first != nil {
f(r.first)
}
for _, ran := range r.mid {
f(&ran)
}
if r.last != nil {
f(r.last)
}
}
func (r *copRanges) toPBRanges() []*coprocessor.KeyRange {
ranges := make([]*coprocessor.KeyRange, 0, r.len())
r.do(func(ran *kv.KeyRange) {
ranges = append(ranges, &coprocessor.KeyRange{
Start: ran.StartKey,
End: ran.EndKey,
})
})
return ranges
}
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bool) ([]*copTask, error) {
coprocessorCounter.WithLabelValues("build_task").Inc()
start := time.Now()
rangesLen := ranges.len()
var tasks []*copTask
appendTask := func(region RegionVerID, ranges *copRanges) {
tasks = append(tasks, &copTask{
region: region,
ranges: ranges,
respChan: make(chan copResponse, 1),
})
}
for ranges.len() > 0 {
loc, err := cache.LocateKey(bo, ranges.at(0).StartKey)
if err != nil {
return nil, errors.Trace(err)
}
// Iterate to the first range that is not complete in the region.
var i int
for ; i < ranges.len(); i++ {
r := ranges.at(i)
if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) {
break
}
}
// All rest ranges belong to the same region.
if i == ranges.len() {
appendTask(loc.Region, ranges)
break
}
r := ranges.at(i)
if loc.Contains(r.StartKey) {
// Part of r is not in the region. We need to split it.
taskRanges := ranges.slice(0, i)
taskRanges.last = &kv.KeyRange{
StartKey: r.StartKey,
EndKey: loc.EndKey,
}
appendTask(loc.Region, taskRanges)
ranges = ranges.slice(i+1, ranges.len())
ranges.first = &kv.KeyRange{
StartKey: loc.EndKey,
EndKey: r.EndKey,
}
} else {
// rs[i] is not in the region.
appendTask(loc.Region, ranges.slice(0, i))
ranges = ranges.slice(i, ranges.len())
}
}
if desc {
reverseTasks(tasks)
}
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
log.Warnf("buildCopTasks takes too much time (%v), range len %v, task len %v", elapsed, rangesLen, len(tasks))
}
txnRegionsNumHistogram.WithLabelValues("coprocessor").Observe(float64(len(tasks)))
return tasks, nil
}
func reverseTasks(tasks []*copTask) {
for i := 0; i < len(tasks)/2; i++ {
j := len(tasks) - i - 1
tasks[i], tasks[j] = tasks[j], tasks[i]
}
}
type copIterator struct {
store *tikvStore
req *kv.Request
concurrency int
finished chan struct{}
// If keepOrder, results are stored in copTask.respChan, read them out one by one.
tasks []*copTask
curr int
// Otherwise, results are stored in respChan.
respChan chan copResponse
wg sync.WaitGroup
}
type copResponse struct {
*coprocessor.Response
err error
}
const minLogCopTaskTime = 300 * time.Millisecond
// work is a worker function that get a copTask from channel, handle it and
// send the result back.
func (it *copIterator) work(ctx goctx.Context, taskCh <-chan *copTask) {
defer it.wg.Done()
for task := range taskCh {
bo := NewBackoffer(copNextMaxBackoff, ctx)
startTime := time.Now()
resps := it.handleTask(bo, task)
costTime := time.Since(startTime)
if costTime > minLogCopTaskTime {
log.Infof("[TIME_COP_TASK] %s%s %s", costTime, bo, task)
}
coprocessorHistogram.Observe(costTime.Seconds())
if bo.totalSleep > 0 {
backoffHistogram.Observe(float64(bo.totalSleep) / 1000)
}
var ch chan copResponse
if !it.req.KeepOrder {
ch = it.respChan
} else {
ch = task.respChan
}
for _, resp := range resps {
select {
case ch <- resp:
case <-ctx.Done():
return
case <-it.finished:
return
}
}
if it.req.KeepOrder {
close(ch)
}
}
}
func (it *copIterator) run(ctx goctx.Context) {
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
copIteratorGP.Go(func() {
childCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
it.work(childCtx, taskCh)
})
}
copIteratorGP.Go(func() {
// Send tasks to feed the worker goroutines.
childCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
for _, t := range it.tasks {
finished, canceled := it.sendToTaskCh(childCtx, t, taskCh)
if finished || canceled {
break
}
}
close(taskCh)
// Wait for worker goroutines to exit.
it.wg.Wait()
if !it.req.KeepOrder {
close(it.respChan)
}
})
}
func recvFromRespCh(respCh <-chan copResponse, finished <-chan struct{}) (resp copResponse, ok bool, exit bool) {
select {
case resp, ok = <-respCh:
case <-finished:
exit = true
}
return
}
func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask, taskCh chan<- *copTask) (finished bool, canceled bool) {
select {
case taskCh <- t:
case <-it.finished:
finished = true
case <-ctx.Done():
canceled = true
}
return
}
// Next returns next coprocessor result.
func (it *copIterator) Next() ([]byte, error) {
coprocessorCounter.WithLabelValues("next").Inc()
var (
resp copResponse
ok bool
)
// If data order matters, response should be returned in the same order as copTask slice.
// Otherwise all responses are returned from a single channel.
if !it.req.KeepOrder {
// Get next fetched resp from chan
resp, ok = <-it.respChan
if !ok {
return nil, nil
}
} else {
var closed bool
for {
if it.curr >= len(it.tasks) {
// Resp will be nil if iterator is finished.
return nil, nil
}
task := it.tasks[it.curr]
resp, ok, closed = recvFromRespCh(task.respChan, it.finished)
if closed {
// Close() is already called, so Next() is invalid.
return nil, nil
}
if ok {
break
}
// Switch to next task.
it.tasks[it.curr] = nil
it.curr++
}
}
if resp.err != nil {
return nil, errors.Trace(resp.err)
}
err := it.store.CheckVisibility(it.req.StartTs)
if err != nil {
return nil, errors.Trace(err)
}
if resp.Data == nil {
return []byte{}, nil
}
return resp.Data, nil
}
// handleTask handles single copTask.
func (it *copIterator) handleTask(bo *Backoffer, task *copTask) []copResponse {
coprocessorCounter.WithLabelValues("handle_task").Inc()
sender := NewRegionRequestSender(it.store.regionCache, it.store.client)
for {
select {
case <-it.finished:
return nil
default:
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdCop,
Cop: &coprocessor.Request{
Tp: it.req.Tp,
Data: it.req.Data,
Ranges: task.ranges.toPBRanges(),
},
Context: kvrpcpb.Context{
IsolationLevel: pbIsolationLevel(it.req.IsolationLevel),
Priority: kvPriorityToCommandPri(it.req.Priority),
NotFillCache: it.req.NotFillCache,
},
}
resp, err := sender.SendReq(bo, req, task.region, readTimeoutMedium)
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
}
if regionErr := resp.Cop.GetRegionError(); regionErr != nil {
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
}
return it.handleRegionErrorTask(bo, task)
}
if e := resp.Cop.GetLocked(); e != nil {
log.Debugf("coprocessor encounters lock: %v", e)
ok, err1 := it.store.lockResolver.ResolveLocks(bo, []*Lock{newLock(e)})
if err1 != nil {
return []copResponse{{err: errors.Trace(err1)}}
}
if !ok {
err = bo.Backoff(boTxnLockFast, errors.New(e.String()))
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
}
}
continue
}
if e := resp.Cop.GetOtherError(); e != "" {
err = errors.Errorf("other error: %s", e)
log.Warnf("coprocessor err: %v", err)
return []copResponse{{err: errors.Trace(err)}}
}
task.storeAddr = sender.storeAddr
return []copResponse{{Response: resp.Cop}}
}
}
// handleRegionErrorTask handles current task. It may be split into multiple tasks (in region split scenario).
func (it *copIterator) handleRegionErrorTask(bo *Backoffer, task *copTask) []copResponse {
coprocessorCounter.WithLabelValues("rebuild_task").Inc()
newTasks, err := buildCopTasks(bo, it.store.regionCache, task.ranges, it.req.Desc)
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
}
if newTasks == nil {
// TODO: check this, this should never happen.
return nil
}
var ret []copResponse
for _, t := range newTasks {
resp := it.handleTask(bo, t)
ret = append(ret, resp...)
}
return ret
}
func (it *copIterator) Close() error {
close(it.finished)
it.wg.Wait()
return nil
}
// copErrorResponse returns error when calling Next()
type copErrorResponse struct{ error }
func (it copErrorResponse) Next() ([]byte, error) {
return nil, it.error
}
func (it copErrorResponse) Close() error {
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhoujin826/tidb.git
git@gitee.com:zhoujin826/tidb.git
zhoujin826
tidb
tidb
v1.0.9

搜索帮助

0d507c66 1850385 C8b1a773 1850385