Fetch the repository succeeded.
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package tikv provides tcp connection to kvserver.
package tikv
import (
"io"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/terror"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// MaxConnectionCount is the max gRPC connections that will be established with
// each tikv-server.
var MaxConnectionCount uint = 16
// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
var MaxSendMsgSize = 1<<31 - 1
// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
var MaxCallMsgSize = 1<<31 - 1
// Timeout durations.
const (
dialTimeout = 5 * time.Second
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
)
// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
// Close should release all data.
Close() error
// SendRequest sends Request.
SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error)
}
type connArray struct {
index uint32
v []*grpc.ClientConn
// Bind with a background goroutine to process coprocessor streaming timeout.
streamTimeout chan *tikvrpc.Lease
}
func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
}
if err := a.Init(addr, security); err != nil {
return nil, err
}
return a, nil
}
func (a *connArray) Init(addr string, security config.Security) error {
opt := grpc.WithInsecure()
if len(security.ClusterSSLCA) != 0 {
tlsConfig, err := security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
unaryInterceptor := grpc_prometheus.UnaryClientInterceptor
streamInterceptor := grpc_prometheus.StreamClientInterceptor
cfg := config.GetGlobalConfig()
if cfg.OpenTracing.Enable {
unaryInterceptor = grpc_middleware.ChainUnaryClient(
unaryInterceptor,
grpc_opentracing.UnaryClientInterceptor(),
)
streamInterceptor = grpc_middleware.ChainStreamClient(
streamInterceptor,
grpc_opentracing.StreamClientInterceptor(),
)
}
for i := range a.v {
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
conn, err := grpc.DialContext(
ctx,
addr,
opt,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
grpc.WithBackoffMaxDelay(time.Second*3),
)
cancel()
if err != nil {
// Cleanup if the initialization fails.
a.Close()
return errors.Trace(err)
}
a.v[i] = conn
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout)
return nil
}
func (a *connArray) Get() *grpc.ClientConn {
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v))
return a.v[next]
}
func (a *connArray) Close() {
for i, c := range a.v {
if c != nil {
err := c.Close()
terror.Log(errors.Trace(err))
a.v[i] = nil
}
}
close(a.streamTimeout)
}
// rpcClient is RPC client struct.
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
// that there are too many concurrent requests which overload the service of TiKV.
// TODO: Implement background cleanup. It adds a background goroutine to periodically check
// whether there is any connection is idle and then close and remove these idle connections.
type rpcClient struct {
sync.RWMutex
isClosed bool
conns map[string]*connArray
security config.Security
}
func newRPCClient(security config.Security) *rpcClient {
return &rpcClient{
conns: make(map[string]*connArray),
security: security,
}
}
func (c *rpcClient) getConnArray(addr string) (*connArray, error) {
c.RLock()
if c.isClosed {
c.RUnlock()
return nil, errors.Errorf("rpcClient is closed")
}
array, ok := c.conns[addr]
c.RUnlock()
if !ok {
var err error
array, err = c.createConnArray(addr)
if err != nil {
return nil, err
}
}
return array, nil
}
func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
c.Lock()
defer c.Unlock()
array, ok := c.conns[addr]
if !ok {
var err error
array, err = newConnArray(MaxConnectionCount, addr, c.security)
if err != nil {
return nil, err
}
c.conns[addr] = array
}
return array, nil
}
func (c *rpcClient) closeConns() {
c.Lock()
if !c.isClosed {
c.isClosed = true
// close all connections
for _, array := range c.conns {
array.Close()
}
}
c.Unlock()
}
// SendRequest sends a Request to server and receives Response.
func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
start := time.Now()
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
defer func() {
metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID).Observe(time.Since(start).Seconds())
}()
connArray, err := c.getConnArray(addr)
if err != nil {
return nil, errors.Trace(err)
}
client := tikvpb.NewTikvClient(connArray.Get())
if req.Type != tikvrpc.CmdCopStream {
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return tikvrpc.CallRPC(ctx1, client, req)
}
// Coprocessor streaming request.
// Use context to support timeout for grpc streaming client.
ctx1, cancel := context.WithCancel(ctx)
resp, err := tikvrpc.CallRPC(ctx1, client, req)
if err != nil {
return nil, errors.Trace(err)
}
// Put the lease object to the timeout channel, so it would be checked periodically.
copStream := resp.CopStream
copStream.Timeout = timeout
copStream.Lease.Cancel = cancel
connArray.streamTimeout <- &copStream.Lease
// Read the first streaming response to get CopStreamResponse.
// This can make error handling much easier, because SendReq() retry on
// region error automatically.
var first *coprocessor.Response
first, err = copStream.Recv()
if err != nil {
if errors.Cause(err) != io.EOF {
return nil, errors.Trace(err)
}
log.Debug("copstream returns nothing for the request.")
}
copStream.Response = first
return resp, nil
}
func (c *rpcClient) Close() error {
c.closeConns()
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。