2 Star 2 Fork 2

tym_hmm/mysql-mydumper

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
dumper.go 8.79 KB
Copy Edit Raw Blame History
天蝎儿 authored 2021-12-21 14:14 +08:00 . 修改helper组件
/*
* go-mydumper
* xelabs.org
*
* Copyright (c) XeLabs
* GPL License
*
*/
package common
import (
"fmt"
"gitee.com/tym_hmm/go-helper/TimeHelper"
"gitee.com/tym_hmm/mysql-mydumper/config"
"gitee.com/tym_hmm/mysql-mydumper/extend/mysql/xlog"
querypb "gitee.com/tym_hmm/mysql-mydumper/extend/mysql/sqlparser/depends/query"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
)
func writeMetaData(args *config.Config) {
file := fmt.Sprintf("%s/metadata", args.Outdir)
WriteFile(file, "")
}
func dumpDatabaseSchema(log *xlog.Log, conn *Connection, args *config.Config, database string) {
err := conn.Execute(fmt.Sprintf("USE `%s`", database))
AssertNil(err)
schema := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`;", database)
file := fmt.Sprintf("%s/%s-schema-create.sql", args.Outdir, database)
WriteFile(file, schema)
log.Info("dumping.database[%s].schema...", database)
}
func dumpTableSchema(log *xlog.Log, conn *Connection, args *config.Config, database string, table string) {
qr, err := conn.Fetch(fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`", database, table))
AssertNil(err)
schema := qr.Rows[0][1].String() + ";\n"
file := fmt.Sprintf("%s/%s.%s-schema.sql", args.Outdir, database, table)
WriteFile(file, schema)
log.Info("dumping.table[%s.%s].schema...", database, table)
}
func dumpTable(log *xlog.Log, conn *Connection, args *config.Config, database string, table string) {
var allBytes uint64
var allRows uint64
var where string
var selfields []string
fields := make([]string, 0, 16)
{
cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT * FROM `%s`.`%s` LIMIT 1", database, table))
AssertNil(err)
flds := cursor.Fields()
for _, fld := range flds {
//log.Info("dump -- %#v, %s, %s", args.Filters, table, fld.Name)
if _, ok := args.Filters[table][fld.Name]; ok {
continue
}
fields = append(fields, fmt.Sprintf("`%s`", fld.Name))
replacement, ok := args.Selects[table][fld.Name]
if ok {
selfields = append(selfields, fmt.Sprintf("%s AS `%s`", replacement, fld.Name))
} else {
selfields = append(selfields, fmt.Sprintf("`%s`", fld.Name))
}
}
err = cursor.Close()
AssertNil(err)
}
if v, ok := args.Wheres[table]; ok {
where = fmt.Sprintf(" WHERE %v", v)
}
cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT %s FROM `%s`.`%s` %s", strings.Join(selfields, ", "), database, table, where))
AssertNil(err)
fileNo := 1
stmtsize := 0
chunkbytes := 0
rows := make([]string, 0, 256)
inserts := make([]string, 0, 256)
for cursor.Next() {
row, err := cursor.RowValues()
AssertNil(err)
values := make([]string, 0, 16)
for _, v := range row {
if v.Raw() == nil {
values = append(values, "NULL")
} else {
str := v.String()
switch {
case v.IsSigned(), v.IsUnsigned(), v.IsFloat(), v.IsIntegral(), v.Type() == querypb.Type_DECIMAL:
values = append(values, str)
default:
values = append(values, fmt.Sprintf("\"%s\"", EscapeBytes(v.Raw())))
}
}
}
r := "(" + strings.Join(values, ",") + ")"
rows = append(rows, r)
allRows++
stmtsize += len(r)
chunkbytes += len(r)
allBytes += uint64(len(r))
atomic.AddUint64(&args.Allbytes, uint64(len(r)))
atomic.AddUint64(&args.Allrows, 1)
if stmtsize >= args.StmtSize {
insertone := fmt.Sprintf("INSERT INTO `%s`(%s) VALUES\n%s", table, strings.Join(fields, ","), strings.Join(rows, ",\n"))
inserts = append(inserts, insertone)
rows = rows[:0]
stmtsize = 0
}
if (chunkbytes / 1024 / 1024) >= args.ChunksizeInMB {
query := strings.Join(inserts, ";\n") + ";\n"
file := fmt.Sprintf("%s/%s.%s.%05d.sql", args.Outdir, database, table, fileNo)
WriteFile(file, query)
//log.Info("dumping.table[%s.%s].rows[%v].bytes[%vMB].part[%v].thread[%d]", database, table, allRows, (allBytes / 1024 / 1024), fileNo, conn.ID)
inserts = inserts[:0]
chunkbytes = 0
fileNo++
}
}
if chunkbytes > 0 {
if len(rows) > 0 {
insertone := fmt.Sprintf("INSERT INTO `%s`(%s) VALUES\n%s", table, strings.Join(fields, ","), strings.Join(rows, ",\n"))
inserts = append(inserts, insertone)
}
query := strings.Join(inserts, ";\n") + ";\n"
file := fmt.Sprintf("%s/%s.%s.%05d.sql", args.Outdir, database, table, fileNo)
WriteFile(file, query)
}
err = cursor.Close()
AssertNil(err)
//log.Info("dumping.table[%s.%s].done.allrows[%v].allbytes[%vMB].thread[%d]...", database, table, allRows, (allBytes / 1024 / 1024), conn.ID)
}
func allTables(log *xlog.Log, conn *Connection, database string) []string {
qr, err := conn.Fetch(fmt.Sprintf("SHOW TABLES FROM `%s`", database))
AssertNil(err)
tables := make([]string, 0, 128)
for _, t := range qr.Rows {
tables = append(tables, t[0].String())
}
return tables
}
func allDatabases(log *xlog.Log, conn *Connection) []string {
qr, err := conn.Fetch("SHOW DATABASES")
AssertNil(err)
databases := make([]string, 0, 128)
for _, t := range qr.Rows {
databases = append(databases, t[0].String())
}
return databases
}
func filterDatabases(log *xlog.Log, conn *Connection, filter *regexp.Regexp, invert bool) []string {
qr, err := conn.Fetch("SHOW DATABASES")
AssertNil(err)
databases := make([]string, 0, 128)
for _, t := range qr.Rows {
if (!invert && filter.MatchString(t[0].String())) || (invert && !filter.MatchString(t[0].String())) {
databases = append(databases, t[0].String())
}
}
return databases
}
// Dumper used to start the dumper worker.
func Dumper(log *xlog.Log, args *config.Config, listener MyDumperListener) {
nowTimeStr := TimeHelper.GetNowTimeStr()
if listener != nil {
listener.Start(nowTimeStr, args.Database, args.Table, args)
}
pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars)
AssertNil(err)
defer pool.Close()
// Meta data.
writeMetaData(args)
// database.
var wg sync.WaitGroup
conn := pool.Get()
var databases []string
t := time.Now()
if args.DatabaseRegexp != "" {
r := regexp.MustCompile(args.DatabaseRegexp)
databases = filterDatabases(log, conn, r, args.DatabaseInvertRegexp)
} else {
if args.Database != "" {
databases = strings.Split(args.Database, ",")
} else {
databases = allDatabases(log, conn)
}
}
for _, database := range databases {
dumpDatabaseSchema(log, conn, args, database)
}
// tables.
tables := make([][]string, len(databases))
for i, database := range databases {
if args.Table != "" {
tables[i] = strings.Split(args.Table, ",")
} else {
tables[i] = allTables(log, conn, database)
}
}
pool.Put(conn)
var tn int64 = 0
for i, database := range databases {
for _, table := range tables[i] {
conn := pool.Get()
dumpTableSchema(log, conn, args, database, table)
wg.Add(1)
go func(conn *Connection, database string, table string, tn *int64) {
defer func() {
wg.Done()
pool.Put(conn)
timeStr := TimeHelper.GetNowTimeStr()
if listener != nil {
listener.DoneNode(timeStr, database, table, args, conn)
} else {
log.Info("dumping.table[%s.%s].datas.thread[%d].done...", database, table, conn.ID)
}
atomic.AddInt64(tn, 1)
}()
timeStr := TimeHelper.GetNowTimeStr()
if listener != nil {
listener.StartNode(timeStr, database, table, args, conn)
} else {
log.Info("dumping.table[%s.%s].datas.thread[%d]...", database, table, conn.ID)
}
dumpTable(log, conn, args, database, table)
}(conn, database, table, &tn)
}
}
wg.Add(1)
go func(tn *int64, tables [][]string) {
defer wg.Done()
for {
diff := time.Since(t).Seconds()
atn := atomic.LoadInt64(tn)
allbytesMB := float64(atomic.LoadUint64(&args.Allbytes) / 1024 / 1024)
allbytes := atomic.LoadUint64(&args.Allbytes)
allrows := atomic.LoadUint64(&args.Allrows)
rates := allbytesMB / diff
log.Info("dumping.allbytes[%vMB].allrows[%v].time[%.2fsec].rates[%.2fMB/sec]...", allbytesMB, allrows, diff, rates)
//log.Info("dumping.allbytes[%vMB].allrows[%v].time[%.2fsec].rates[%.2fMB/sec]...", allbytesMB, allrows, diff, rates)
if listener != nil {
timeStr := TimeHelper.GetNowTimeStr()
listener.PendingNode(timeStr, args.Database, args.Table, allrows, allbytes, rates)
} else {
log.Info("dumping.allbytes[%vMB].allrows[%v].time[%.2fsec].rates[%.2fMB/sec]...", allbytesMB, allrows, diff, rates)
}
tlen := int64(len(tables))
if atn >= tlen {
break
}
time.Sleep(time.Millisecond * 1000)
}
}(&tn, tables)
wg.Wait()
elapsed := time.Since(t).Seconds()
timeStr := TimeHelper.GetNowTimeStr()
rates := (float64(args.Allbytes/1024/1024) / elapsed)
//log.Info("dumping.all.done.cost[%.2fsec].allrows[%v].allbytes[%v].rate[%.2fMB/s]", elapsed, args.Allrows, args.Allbytes, rates)
if listener != nil {
listener.Done(timeStr, args.Database, args.Table, args, args.Allrows, args.Allbytes, rates)
} else {
log.Info("dumping.all.done.cost[%.2fsec].allrows[%v].allbytes[%v].rate[%.2fMB/s]", elapsed, args.Allrows, args.Allbytes, rates)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tym_hmm/mysql-mydumper.git
git@gitee.com:tym_hmm/mysql-mydumper.git
tym_hmm
mysql-mydumper
mysql-mydumper
v1.0.4

Search