90 Star 491 Fork 151

平凯星辰(北京)科技有限公司/tidb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
kv_encoder.go 7.00 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kvenc
import (
"bytes"
"fmt"
"strings"
"sync/atomic"
"github.com/juju/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/tablecodec"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
var _ KvEncoder = &kvEncoder{}
var mockConnID uint64
// KvPair is a key-value pair.
type KvPair struct {
// Key is the key of the pair.
Key []byte
// Val is the value of the pair. if the op is delete, the len(Val) == 0
Val []byte
}
// KvEncoder is an encoder that transfer sql to key-value pairs.
type KvEncoder interface {
// Encode transfers sql to kv pairs.
// Before use Encode() method, please make sure you already created schame by calling ExecDDLSQL() method.
// NOTE: now we just support transfers insert statement to kv pairs.
// (if we wanna support other statement, we need to add a kv.Storage parameter,
// and pass tikv store in.)
// return encoded kvs array that generate by sql, and affectRows count.
Encode(sql string, tableID int64) (kvPairs []KvPair, affectedRows uint64, err error)
// PrepareStmt prepare query statement, and return statement id.
// Pass stmtID into EncodePrepareStmt to execute a prepare statement.
PrepareStmt(query string) (stmtID uint32, err error)
// EncodePrepareStmt transfer prepare query to kv pairs.
// stmtID is generated by PrepareStmt.
EncodePrepareStmt(tableID int64, stmtID uint32, param ...interface{}) (kvPairs []KvPair, affectedRows uint64, err error)
// ExecDDLSQL executes ddl sql, you must use it to create schema infos.
ExecDDLSQL(sql string) error
// EncodeMetaAutoID encode the table meta info, autoID to coresponding key-value pair.
EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error)
// SetSystemVariable set system variable name = value.
SetSystemVariable(name string, value string) error
// GetSystemVariable get the system variable value of name.
GetSystemVariable(name string) (string, bool)
// Close cleanup the kvEncoder.
Close() error
}
type kvEncoder struct {
store kv.Storage
dom *domain.Domain
se session.Session
}
// New new a KvEncoder
func New(dbName string, idAlloc autoid.Allocator) (KvEncoder, error) {
kvEnc := &kvEncoder{}
err := kvEnc.initial(dbName, idAlloc)
if err != nil {
return nil, errors.Trace(err)
}
return kvEnc, nil
}
func (e *kvEncoder) Close() error {
e.dom.Close()
if err := e.store.Close(); err != nil {
return errors.Trace(err)
}
return nil
}
func (e *kvEncoder) Encode(sql string, tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) {
e.se.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
defer func() {
err1 := e.se.RollbackTxn(context.Background())
if err1 != nil {
log.Error(errors.ErrorStack(err1))
}
}()
_, err = e.se.Execute(context.Background(), sql)
if err != nil {
return nil, 0, errors.Trace(err)
}
return e.getKvPairsInMemBuffer(tableID)
}
func (e *kvEncoder) getKvPairsInMemBuffer(tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) {
txnMemBuffer := e.se.Txn(true).GetMemBuffer()
kvPairs = make([]KvPair, 0, txnMemBuffer.Len())
err = kv.WalkMemBuffer(txnMemBuffer, func(k kv.Key, v []byte) error {
if bytes.HasPrefix(k, tablecodec.TablePrefix()) {
k = tablecodec.ReplaceRecordKeyTableID(k, tableID)
}
kvPairs = append(kvPairs, KvPair{Key: k, Val: v})
return nil
})
if err != nil {
return nil, 0, errors.Trace(err)
}
return kvPairs, e.se.GetSessionVars().StmtCtx.AffectedRows(), nil
}
func (e *kvEncoder) PrepareStmt(query string) (stmtID uint32, err error) {
stmtID, _, _, err = e.se.PrepareStmt(query)
return
}
func (e *kvEncoder) EncodePrepareStmt(tableID int64, stmtID uint32, param ...interface{}) (kvPairs []KvPair, affectedRows uint64, err error) {
e.se.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
defer func() {
err1 := e.se.RollbackTxn(context.Background())
if err1 != nil {
log.Error(errors.ErrorStack(err1))
}
}()
_, err = e.se.ExecutePreparedStmt(context.Background(), stmtID, param...)
if err != nil {
return nil, 0, errors.Trace(err)
}
return e.getKvPairsInMemBuffer(tableID)
}
func (e *kvEncoder) EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error) {
mockTxn := kv.NewMockTxn()
m := meta.NewMeta(mockTxn)
k, v := m.GenAutoTableIDIDKeyValue(dbID, tableID, autoID)
return KvPair{Key: k, Val: v}, nil
}
func (e *kvEncoder) ExecDDLSQL(sql string) error {
_, err := e.se.Execute(context.Background(), sql)
if err != nil {
return errors.Trace(err)
}
return nil
}
func (e *kvEncoder) SetSystemVariable(name string, value string) error {
name = strings.ToLower(name)
if e.se != nil {
return e.se.GetSessionVars().SetSystemVar(name, value)
}
return errors.Errorf("e.se is nil, please new KvEncoder by kvencoder.New().")
}
func (e *kvEncoder) GetSystemVariable(name string) (string, bool) {
name = strings.ToLower(name)
if e.se == nil {
return "", false
}
return e.se.GetSessionVars().GetSystemVar(name)
}
func newMockTikvWithBootstrap() (kv.Storage, *domain.Domain, error) {
store, err := mockstore.NewMockTikvStore()
if err != nil {
return nil, nil, errors.Trace(err)
}
session.SetSchemaLease(0)
dom, err := session.BootstrapSession(store)
return store, dom, errors.Trace(err)
}
func (e *kvEncoder) initial(dbName string, idAlloc autoid.Allocator) (err error) {
var (
store kv.Storage
dom *domain.Domain
se session.Session
)
defer func() {
if err == nil {
return
}
if store != nil {
if err1 := store.Close(); err1 != nil {
log.Error(errors.ErrorStack(err1))
}
}
if dom != nil {
dom.Close()
}
if se != nil {
se.Close()
}
}()
// disable stats update.
session.SetStatsLease(0)
store, dom, err = newMockTikvWithBootstrap()
if err != nil {
err = errors.Trace(err)
return
}
se, err = session.CreateSession(store)
if err != nil {
err = errors.Trace(err)
return
}
se.SetConnectionID(atomic.AddUint64(&mockConnID, 1))
_, err = se.Execute(context.Background(), fmt.Sprintf("create database if not exists %s", dbName))
if err != nil {
err = errors.Trace(err)
return
}
_, err = se.Execute(context.Background(), fmt.Sprintf("use %s", dbName))
if err != nil {
err = errors.Trace(err)
return
}
se.GetSessionVars().IDAllocator = idAlloc
se.GetSessionVars().ImportingData = true
se.GetSessionVars().SkipUTF8Check = true
e.se = se
e.store = store
e.dom = dom
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/pingcap/tidb.git
git@gitee.com:pingcap/tidb.git
pingcap
tidb
tidb
v2.0.10

搜索帮助

0d507c66 1850385 C8b1a773 1850385