代码拉取完成,页面将自动刷新
package dbUtils
import (
"fmt"
"gitee.com/qq358678184_admin/gcore/logHelper"
"os"
"strings"
)
type DbSyncClient struct {
}
func NewDbSyncClient() *DbSyncClient {
d := new(DbSyncClient)
return d
}
func (d *DbSyncClient) ReplayExecuteSql(logContent *DbLogContent) {
dbHelper := NewDbHelper()
dbConfig := dbHelper.GetTargetDbConfig(logContent.DataBaseKey)
if dbConfig == nil {
return
}
session := dbHelper.GetDbByDbConfig(dbConfig)
defer session.Close()
sqlOrArgs := make([]interface{}, 0)
sqlOrArgs = append(sqlOrArgs, logContent.Sql)
if logContent.RecordValues != nil {
sqlOrArgs = append(sqlOrArgs, logContent.RecordValues...)
}
_, err := session.Exec(sqlOrArgs...)
if err != nil {
logHelper.Error(err)
}
}
//ReplayInsertRecord 重放插入
func (d *DbSyncClient) ReplayInsertRecord(logContent *DbLogContent) {
dbConfig := NewDbHelper().GetTargetDbConfig(logContent.DataBaseKey)
if dbConfig == nil {
return
}
dialect := dbConfig.DataBaseType
var sql = &strings.Builder{}
if dialect == "mssql" && logContent.HasAutoIncr {
setIdentitySql := fmt.Sprintf("SET IDENTITY_INSERT %s ON;", logContent.TableName)
sql.WriteString(setIdentitySql)
}
sql.WriteString(fmt.Sprintf("insert into %s(%s) VALUES", logContent.TableName, strings.Join(logContent.Fields, ",")))
for idx, _ := range logContent.Fields {
if idx == 0 {
sql.WriteString("(?")
if len(logContent.Fields) == 1 {
sql.WriteString(")")
}
} else if idx == (len(logContent.Fields) - 1) {
sql.WriteString(",?)")
} else {
sql.WriteString(",?")
}
}
if dialect == "mssql" && logContent.HasAutoIncr {
setIdentitySql2 := fmt.Sprintf(";SET IDENTITY_INSERT %s OFF;", logContent.TableName)
sql.WriteString(setIdentitySql2)
}
sqlOrArgs := make([]interface{}, 0)
sqlOrArgs = append(sqlOrArgs, sql.String())
sqlVars := logContent.RecordValues
if sqlVars != nil && len(sqlVars) > 0 {
sqlOrArgs = append(sqlOrArgs, sqlVars...)
}
dbHelper := NewDbHelper()
session := dbHelper.GetDbByDbConfig(dbConfig)
//defer session.Close()
_, err := session.Exec(sqlOrArgs...)
if err != nil {
logHelper.Error(err)
}
}
//ReplayDeleteRecord 重放删除
func (d *DbSyncClient) ReplayDeleteRecord(logContent *DbLogContent) {
var sql = &strings.Builder{}
sql.WriteString(fmt.Sprintf("delete from %s where 1=1", logContent.TableName))
if len(logContent.WhereFields) > 0 {
for idx, _ := range logContent.WhereFields {
sql.WriteString(" " + logContent.WhereFields[idx])
}
}
sqlOrArgs := make([]interface{}, 0)
sqlOrArgs = append(sqlOrArgs, sql.String())
if logContent.WhereFieldValues != nil && len(logContent.WhereFieldValues) > 0 {
sqlOrArgs = append(sqlOrArgs, logContent.WhereFieldValues...)
}
dbHelper := NewDbHelper()
dbConfig := dbHelper.GetTargetDbConfig(logContent.DataBaseKey)
if dbConfig == nil {
return
}
session := dbHelper.GetDbByDbConfig(dbConfig)
//defer session.Close()
_, err := session.Exec(sqlOrArgs...)
if err != nil {
logHelper.Error(err)
}
}
//ReplayUpdateRecord 重放更新记录
func (d *DbSyncClient) ReplayUpdateRecord(logContent *DbLogContent) {
var sql = &strings.Builder{}
sql.WriteString(fmt.Sprintf("update %s set ", logContent.TableName))
//var pkField *orm.Field = schema.PkField
var isFirstSet = true
for _, field := range logContent.Fields {
//if field.IsPk {
// continue
//}
if isFirstSet {
sql.WriteString(fmt.Sprintf("%s=?", field))
isFirstSet = false
} else {
sql.WriteString(fmt.Sprintf(",%s=?", field))
}
}
sql.WriteString(" where 1=1 ")
if logContent.WhereFields != nil {
for _, field := range logContent.WhereFields {
sql.WriteString(field)
}
}
dbHelper := NewDbHelper()
dbConfig := dbHelper.GetTargetDbConfig(logContent.DataBaseKey)
if dbConfig == nil {
return
}
session := dbHelper.GetDbByDbConfig(dbConfig)
//defer session.Close()
sqlOrArgs := make([]interface{}, 0)
sqlOrArgs = append(sqlOrArgs, sql.String())
sqlOrArgs = append(sqlOrArgs, logContent.RecordValues...)
if logContent.WhereFieldValues != nil {
sqlOrArgs = append(sqlOrArgs, logContent.WhereFieldValues...)
}
_, err := session.Exec(sqlOrArgs...)
if err != nil {
logHelper.Error(err)
return
}
}
func (d *DbSyncClient) ReplayDbLog(log *DbLogContent) {
dbHelper := NewDbHelper()
dbConfig := dbHelper.GetTargetDbConfig(log.DataBaseKey)
if dbConfig == nil {
return
}
defer func() {
if r := recover(); r != nil {
logHelper.Error(r)
}
}()
if log.LogType == Insert {
d.ReplayInsertRecord(log)
} else if log.LogType == Delete {
d.ReplayDeleteRecord(log)
} else if log.LogType == Update {
d.ReplayUpdateRecord(log)
} else if log.LogType == ExecuteSql {
d.ReplayExecuteSql(log)
}
}
func (d *DbSyncClient) StartSyncDbData() {
path, err := NewDbLogSender().GetOldFirstFilePath()
if err != nil {
logHelper.Error(err)
return
}
if path == "" {
return
}
logContents, err := NewDbParseHelper().ParseDbLog(path)
if err != nil {
logHelper.Error(err)
return
}
if len(logContents) == 0 {
return
}
for _, logContent := range logContents {
d.ReplayDbLog(logContent)
}
err = os.Remove(path)
if err != nil {
logHelper.Error(err)
return
}
var msg = fmt.Sprintf("同步%s成功,共执行{%d}条数据", path, len(logContents))
fmt.Println(msg)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。