1 Star 1 Fork 0

Blazz / go-mysql-plus

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
dumper.go 6.43 KB
一键复制 编辑 原始数据 按行查看 历史
Blazz 提交于 2023-06-16 11:32 . change module name for ext
package dump
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"strings"
. "gitee.com/Blazz/go-mysql-plus/mysql"
"github.com/pingcap/errors"
"github.com/siddontang/go-log/log"
)
// Unlick mysqldump, Dumper is designed for parsing and syning data easily.
type Dumper struct {
// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
ExecutionPath string
Addr string
User string
Password string
Protocol string
// Will override Databases
Tables []string
TableDB string
Databases []string
Where string
Charset string
IgnoreTables map[string][]string
ExtraOptions []string
ErrOut io.Writer
masterDataSkipped bool
maxAllowedPacket int
hexBlob bool
// see detectColumnStatisticsParamSupported
isColumnStatisticsParamSupported bool
}
func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
if len(executionPath) == 0 {
return nil, nil
}
path, err := exec.LookPath(executionPath)
if err != nil {
return nil, errors.Trace(err)
}
d := new(Dumper)
d.ExecutionPath = path
d.Addr = addr
d.User = user
d.Password = password
d.Tables = make([]string, 0, 16)
d.Databases = make([]string, 0, 16)
d.Charset = DEFAULT_CHARSET
d.IgnoreTables = make(map[string][]string)
d.ExtraOptions = make([]string, 0, 5)
d.masterDataSkipped = false
d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported()
d.ErrOut = os.Stderr
return d, nil
}
// New mysqldump versions try to send queries to information_schema.COLUMN_STATISTICS table which does not exist in old MySQL (<5.x).
// And we got error: "Unknown table 'COLUMN_STATISTICS' in information_schema (1109)".
//
// mysqldump may not send this query if it is started with parameter --column-statistics.
// But this parameter exists only for versions >=8.0.2 (https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html).
//
// For environments where the version of mysql-server and mysqldump differs, we try to check this parameter and use it if available.
func (d *Dumper) detectColumnStatisticsParamSupported() bool {
out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
if err != nil {
return false
}
return bytes.Contains(out, []byte(`--column-statistics`))
}
func (d *Dumper) SetCharset(charset string) {
d.Charset = charset
}
func (d *Dumper) SetProtocol(protocol string) {
d.Protocol = protocol
}
func (d *Dumper) SetWhere(where string) {
d.Where = where
}
func (d *Dumper) SetExtraOptions(options []string) {
d.ExtraOptions = options
}
func (d *Dumper) SetErrOut(o io.Writer) {
d.ErrOut = o
}
// SkipMasterData: In some cloud MySQL, we have no privilege to use `--master-data`.
func (d *Dumper) SkipMasterData(v bool) {
d.masterDataSkipped = v
}
func (d *Dumper) SetMaxAllowedPacket(i int) {
d.maxAllowedPacket = i
}
func (d *Dumper) SetHexBlob(v bool) {
d.hexBlob = v
}
func (d *Dumper) AddDatabases(dbs ...string) {
d.Databases = append(d.Databases, dbs...)
}
func (d *Dumper) AddTables(db string, tables ...string) {
if d.TableDB != db {
d.TableDB = db
d.Tables = d.Tables[0:0]
}
d.Tables = append(d.Tables, tables...)
}
func (d *Dumper) AddIgnoreTables(db string, tables ...string) {
t := d.IgnoreTables[db]
t = append(t, tables...)
d.IgnoreTables[db] = t
}
func (d *Dumper) Reset() {
d.Tables = d.Tables[0:0]
d.TableDB = ""
d.IgnoreTables = make(map[string][]string)
d.Databases = d.Databases[0:0]
d.Where = ""
}
func (d *Dumper) Dump(w io.Writer) error {
args := make([]string, 0, 16)
// Common args
if strings.Contains(d.Addr, "/") {
args = append(args, fmt.Sprintf("--socket=%s", d.Addr))
} else {
seps := strings.SplitN(d.Addr, ":", 2)
args = append(args, fmt.Sprintf("--host=%s", seps[0]))
if len(seps) > 1 {
args = append(args, fmt.Sprintf("--port=%s", seps[1]))
}
}
args = append(args, fmt.Sprintf("--user=%s", d.User))
passwordArg := fmt.Sprintf("--password=%s", d.Password)
args = append(args, passwordArg)
passwordArgIndex := len(args) - 1
if !d.masterDataSkipped {
args = append(args, "--master-data")
}
if d.maxAllowedPacket > 0 {
// mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM
args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket))
}
if d.Protocol != "" {
args = append(args, fmt.Sprintf("--protocol=%s", d.Protocol))
}
args = append(args, "--single-transaction")
args = append(args, "--skip-lock-tables")
// Disable uncessary data
args = append(args, "--compact")
args = append(args, "--skip-opt")
args = append(args, "--quick")
// We only care about data
args = append(args, "--no-create-info")
// Multi row is easy for us to parse the data
args = append(args, "--skip-extended-insert")
args = append(args, "--skip-tz-utc")
if d.hexBlob {
// Use hex for the binary type
args = append(args, "--hex-blob")
}
for db, tables := range d.IgnoreTables {
for _, table := range tables {
args = append(args, fmt.Sprintf("--ignore-table=%s.%s", db, table))
}
}
if len(d.Charset) != 0 {
args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset))
}
if len(d.Where) != 0 {
args = append(args, fmt.Sprintf("--where=%s", d.Where))
}
if len(d.ExtraOptions) != 0 {
args = append(args, d.ExtraOptions...)
}
if d.isColumnStatisticsParamSupported {
args = append(args, `--column-statistics=0`)
}
if len(d.Tables) == 0 && len(d.Databases) == 0 {
args = append(args, "--all-databases")
} else if len(d.Tables) == 0 {
args = append(args, "--databases")
args = append(args, d.Databases...)
} else {
args = append(args, d.TableDB)
args = append(args, d.Tables...)
// If we only dump some tables, the dump data will not have database name
// which makes us hard to parse, so here we add it manually.
_, err := w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
if err != nil {
return fmt.Errorf(`could not write USE command: %w`, err)
}
}
args[passwordArgIndex] = "--password=******"
log.Infof("exec mysqldump with %v", args)
args[passwordArgIndex] = passwordArg
cmd := exec.Command(d.ExecutionPath, args...)
cmd.Stderr = d.ErrOut
cmd.Stdout = w
return cmd.Run()
}
// DumpAndParse: Dump MySQL and parse immediately
func (d *Dumper) DumpAndParse(h ParseHandler) error {
r, w := io.Pipe()
done := make(chan error, 1)
go func() {
err := Parse(r, h, !d.masterDataSkipped)
_ = r.CloseWithError(err)
done <- err
}()
err := d.Dump(w)
_ = w.CloseWithError(err)
err = <-done
return errors.Trace(err)
}
Go
1
https://gitee.com/Blazz/go-mysql-plus.git
git@gitee.com:Blazz/go-mysql-plus.git
Blazz
go-mysql-plus
go-mysql-plus
v1.7.3

搜索帮助