90 Star 486 Fork 146

平凯星辰(北京)科技有限公司 / tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
kv.go 9.08 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
import (
"crypto/tls"
"fmt"
"math/rand"
"net/url"
"strings"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/juju/errors"
"github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type storeCache struct {
sync.Mutex
cache map[string]*tikvStore
}
var mc storeCache
// Driver implements engine Driver.
type Driver struct {
}
func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
},
TLS: tlsConfig,
})
if err != nil {
return nil, errors.Trace(err)
}
return cli, nil
}
// Open opens or creates an TiKV storage with given path.
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
func (d Driver) Open(path string) (kv.Storage, error) {
mc.Lock()
defer mc.Unlock()
security := config.GetGlobalConfig().Security
etcdAddrs, disableGC, err := parsePath(path)
if err != nil {
return nil, errors.Trace(err)
}
pdCli, err := pd.NewClient(etcdAddrs, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
})
if err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
return nil, errors.Annotate(err, txnRetryableMark)
}
return nil, errors.Trace(err)
}
// FIXME: uuid will be a very long and ugly string, simplify it.
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO()))
if store, ok := mc.cache[uuid]; ok {
return store, nil
}
tlsConfig, err := security.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
spkv, err := NewEtcdSafePointKV(etcdAddrs, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC)
if err != nil {
return nil, errors.Trace(err)
}
s.etcdAddrs = etcdAddrs
s.tlsConfig = tlsConfig
mc.cache[uuid] = s
return s, nil
}
// update oracle's lastTS every 2000ms.
var oracleUpdateInterval = 2000
type tikvStore struct {
clusterID uint64
uuid string
oracle oracle.Oracle
client Client
pdClient pd.Client
regionCache *RegionCache
lockResolver *LockResolver
gcWorker GCHandler
etcdAddrs []string
tlsConfig *tls.Config
mock bool
enableGC bool
kv SafePointKV
safePoint uint64
spTime time.Time
spMutex sync.RWMutex // this is used to update safePoint and spTime
closed chan struct{} // this is used to nofity when the store is closed
}
func (s *tikvStore) UpdateSPCache(cachedSP uint64, cachedTime time.Time) {
s.spMutex.Lock()
s.safePoint = cachedSP
s.spTime = cachedTime
s.spMutex.Unlock()
}
func (s *tikvStore) CheckVisibility(startTime uint64) error {
s.spMutex.RLock()
cachedSafePoint := s.safePoint
cachedTime := s.spTime
s.spMutex.RUnlock()
diff := time.Since(cachedTime)
if diff > (GcSafePointCacheInterval - gcCPUTimeInaccuracyBound) {
return ErrPDServerTimeout.GenByArgs("start timestamp may fall behind safe point")
}
if startTime < cachedSafePoint {
return ErrGCTooEarly
}
return nil
}
func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC bool) (*tikvStore, error) {
o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond)
if err != nil {
return nil, errors.Trace(err)
}
store := &tikvStore{
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
oracle: o,
client: client,
pdClient: pdClient,
regionCache: NewRegionCache(pdClient),
kv: spkv,
safePoint: 0,
spTime: time.Now(),
closed: make(chan struct{}),
}
store.lockResolver = newLockResolver(store)
store.enableGC = enableGC
go store.runSafePointChecker()
return store, nil
}
func (s *tikvStore) EtcdAddrs() []string {
return s.etcdAddrs
}
func (s *tikvStore) TLSConfig() *tls.Config {
return s.tlsConfig
}
// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once.
func (s *tikvStore) StartGCWorker() error {
if !s.enableGC || NewGCHandlerFunc == nil {
return nil
}
gcWorker, err := NewGCHandlerFunc(s)
if err != nil {
return errors.Trace(err)
}
gcWorker.Start()
s.gcWorker = gcWorker
return nil
}
func (s *tikvStore) runSafePointChecker() {
d := gcSafePointUpdateInterval
for {
select {
case spCachedTime := <-time.After(d):
cachedSafePoint, err := loadSafePoint(s.GetSafePointKV(), GcSavedSafePoint)
if err == nil {
metrics.TiKVLoadSafepointCounter.WithLabelValues("ok").Inc()
s.UpdateSPCache(cachedSafePoint, spCachedTime)
d = gcSafePointUpdateInterval
} else {
metrics.TiKVLoadSafepointCounter.WithLabelValues("fail").Inc()
log.Errorf("fail to load safepoint from pd: %v", err)
d = gcSafePointQuickRepeatInterval
}
case <-s.Closed():
return
}
}
}
func (s *tikvStore) Begin() (kv.Transaction, error) {
txn, err := newTiKVTxn(s)
if err != nil {
return nil, errors.Trace(err)
}
metrics.TiKVTxnCounter.Inc()
return txn, nil
}
// BeginWithStartTS begins a transaction with startTS.
func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) {
txn, err := newTikvTxnWithStartTS(s, startTS)
if err != nil {
return nil, errors.Trace(err)
}
metrics.TiKVTxnCounter.Inc()
return txn, nil
}
func (s *tikvStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) {
snapshot := newTiKVSnapshot(s, ver)
metrics.TiKVSnapshotCounter.Inc()
return snapshot, nil
}
func (s *tikvStore) Close() error {
mc.Lock()
defer mc.Unlock()
delete(mc.cache, s.uuid)
s.oracle.Close()
s.pdClient.Close()
if s.gcWorker != nil {
s.gcWorker.Close()
}
close(s.closed)
if err := s.client.Close(); err != nil {
return errors.Trace(err)
}
return nil
}
func (s *tikvStore) UUID() string {
return s.uuid
}
func (s *tikvStore) CurrentVersion() (kv.Version, error) {
bo := NewBackoffer(context.Background(), tsoMaxBackoff)
startTS, err := s.getTimestampWithRetry(bo)
if err != nil {
return kv.NewVersion(0), errors.Trace(err)
}
return kv.NewVersion(startTS), nil
}
func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) {
for {
startTS, err := s.oracle.GetTimestamp(bo)
if err == nil {
return startTS, nil
}
err = bo.Backoff(boPDRPC, errors.Errorf("get timestamp failed: %v", err))
if err != nil {
return 0, errors.Trace(err)
}
}
}
func (s *tikvStore) GetClient() kv.Client {
return &CopClient{
store: s,
}
}
func (s *tikvStore) GetOracle() oracle.Oracle {
return s.oracle
}
func (s *tikvStore) SupportDeleteRange() (supported bool) {
if s.mock {
return false
}
return true
}
func (s *tikvStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
sender := NewRegionRequestSender(s.regionCache, s.client)
return sender.SendReq(bo, req, regionID, timeout)
}
func (s *tikvStore) GetRegionCache() *RegionCache {
return s.regionCache
}
func (s *tikvStore) GetLockResolver() *LockResolver {
return s.lockResolver
}
func (s *tikvStore) GetGCHandler() GCHandler {
return s.gcWorker
}
func (s *tikvStore) Closed() <-chan struct{} {
return s.closed
}
func (s *tikvStore) GetSafePointKV() SafePointKV {
return s.kv
}
func (s *tikvStore) SetOracle(oracle oracle.Oracle) {
s.oracle = oracle
}
func (s *tikvStore) SetTiKVClient(client Client) {
s.client = client
}
func (s *tikvStore) GetTiKVClient() (client Client) {
return s.client
}
func parsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
err = errors.Trace(err)
return
}
if strings.ToLower(u.Scheme) != "tikv" {
err = errors.Errorf("Uri scheme expected[tikv] but found [%s]", u.Scheme)
log.Error(err)
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
default:
err = errors.New("disableGC flag should be true/false")
return
}
etcdAddrs = strings.Split(u.Host, ",")
return
}
func init() {
mc.cache = make(map[string]*tikvStore)
rand.Seed(time.Now().UnixNano())
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pingcap/tidb.git
git@gitee.com:pingcap/tidb.git
pingcap
tidb
tidb
v2.0.10-binlog

搜索帮助

344bd9b3 5694891 D2dac590 5694891