代码拉取完成,页面将自动刷新
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
import (
"time"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
goctx "golang.org/x/net/context"
)
var (
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
)
// RawKVClient is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type RawKVClient struct {
clusterID uint64
regionCache *RegionCache
pdClient pd.Client
rpcClient Client
}
// NewRawKVClient creates a client with PD cluster addrs.
func NewRawKVClient(pdAddrs []string) (*RawKVClient, error) {
pdCli, err := pd.NewClient(pdAddrs)
if err != nil {
return nil, errors.Trace(err)
}
return &RawKVClient{
clusterID: pdCli.GetClusterID(goctx.TODO()),
regionCache: NewRegionCache(pdCli),
pdClient: pdCli,
rpcClient: newRPCClient(),
}, nil
}
// Close closes the client.
func (c *RawKVClient) Close() error {
c.pdClient.Close()
return c.rpcClient.Close()
}
// ClusterID returns the TiKV cluster ID.
func (c *RawKVClient) ClusterID() uint64 {
return c.clusterID
}
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
func (c *RawKVClient) Get(key []byte) ([]byte, error) {
start := time.Now()
defer func() { rawkvCmdHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawGet,
RawGet: &kvrpcpb.RawGetRequest{
Key: key,
},
}
resp, _, err := c.sendReq(key, req)
if err != nil {
return nil, errors.Trace(err)
}
cmdResp := resp.RawGet
if cmdResp == nil {
return nil, errors.Trace(errBodyMissing)
}
if cmdResp.GetError() != "" {
return nil, errors.New(cmdResp.GetError())
}
if len(cmdResp.Value) == 0 {
return nil, nil
}
return cmdResp.Value, nil
}
// Put stores a key-value pair to TiKV.
func (c *RawKVClient) Put(key, value []byte) error {
start := time.Now()
defer func() { rawkvCmdHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) }()
rawkvSizeHistogram.WithLabelValues("key").Observe(float64(len(key)))
rawkvSizeHistogram.WithLabelValues("value").Observe(float64(len(value)))
if len(value) == 0 {
return errors.New("empty value is not supported")
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawPut,
RawPut: &kvrpcpb.RawPutRequest{
Key: key,
Value: value,
},
}
resp, _, err := c.sendReq(key, req)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawPut
if cmdResp == nil {
return errors.Trace(errBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}
// Delete deletes a key-value pair from TiKV.
func (c *RawKVClient) Delete(key []byte) error {
start := time.Now()
defer func() { rawkvCmdHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) }()
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawDelete,
RawDelete: &kvrpcpb.RawDeleteRequest{
Key: key,
},
}
resp, _, err := c.sendReq(key, req)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawDelete
if cmdResp == nil {
return errors.Trace(errBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}
// Scan queries continuous kv pairs, starts from startKey, up to limit pairs.
// If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() { rawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
if limit > MaxRawKVScanLimit {
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
}
for len(keys) < limit {
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawScan,
RawScan: &kvrpcpb.RawScanRequest{
StartKey: startKey,
Limit: uint32(limit - len(keys)),
},
}
resp, loc, err := c.sendReq(startKey, req)
if err != nil {
return nil, nil, errors.Trace(err)
}
cmdResp := resp.RawScan
if cmdResp == nil {
return nil, nil, errors.Trace(errBodyMissing)
}
for _, pair := range cmdResp.Kvs {
keys = append(keys, pair.Key)
values = append(values, pair.Value)
}
startKey = loc.EndKey
if len(startKey) == 0 {
break
}
}
return
}
func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request) (*tikvrpc.Response, *KeyLocation, error) {
bo := NewBackoffer(rawkvMaxBackoff, goctx.Background())
sender := NewRegionRequestSender(c.regionCache, c.rpcClient)
for {
loc, err := c.regionCache.LocateKey(bo, key)
if err != nil {
return nil, nil, errors.Trace(err)
}
resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return nil, nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, nil, errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, nil, errors.Trace(err)
}
continue
}
return resp, loc, nil
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。