代码拉取完成,页面将自动刷新
package dbsync
import (
"errors"
"fmt"
"time"
)
// 获取增量数据时的配置信息
type FetchOptions struct {
IgnoreFields []string // 忽略的列名称
PageNumber int // 分页获取增量的页码,从1开始
PageSize int // 分页获取增量的页大小
UpdateTimeFieldName string // 更新时间所在列的列名称
LastUpdateTime int64 // 从哪个时间戳开始查询,这是大于的关系
WhereSqlStmt string // 自定义SQL查询语句的Where子句
WhereSqlArgs []interface{} // 自定义SQL查询语句的Where子句的参数列表
}
// 获取增量数据的返回结果
type FetchResult struct {
Columns []string `json:"columns"` // 列名称
Data [][]interface{} `json:"data"` // 待同步的数据,每一行是一条数据,与列名称一一对应
Count int64 `json:"count"` // 数据的数量
}
// 获取增量更新的数据
func DoFetch(
db SQLCommon,
tableName string,
options FetchOptions,
) (rsp FetchResult, err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
// 参数校验和处理
if options.UpdateTimeFieldName == "" {
err = errors.New("options.UpdateTimeFieldName must be not nil")
return
}
if options.PageNumber <= 0 {
options.PageNumber = 1
}
if options.PageSize <= 0 {
options.PageSize = 100
}
// 拼接SQL语句
whereStmt := fmt.Sprintf("%s > ?", options.UpdateTimeFieldName)
whereArgs := []interface{}{time.Unix(options.LastUpdateTime, 0)}
if options.WhereSqlStmt != "" {
whereStmt = fmt.Sprintf("%s AND (%s)", whereStmt, options.WhereSqlStmt)
whereArgs = append(whereArgs, options.WhereSqlArgs...)
}
offset, size := (options.PageNumber-1)*options.PageSize, options.PageSize
sqlStmt := fmt.Sprintf("SELECT * FROM %s WHERE %s ORDER BY %s ASC LIMIT %d OFFSET %d",
tableName, whereStmt, options.UpdateTimeFieldName, size, offset)
// 执行查询语句
rows, err := db.Query(sqlStmt, whereArgs...)
if err != nil {
return
}
defer rows.Close()
// 获取所有列名
columns, err := rows.Columns()
if err != nil {
return
}
// 如果有忽略字段,则更新列的结果映射关系
ignoreMap := make(map[string]bool)
for _, ignoreFieldName := range options.IgnoreFields {
ignoreMap[ignoreFieldName] = true
}
// 建立列是否应该存入结果的映射关系
columnValidMap, validLen := make(map[int]int), 0
for i, columnName := range columns {
if ignoreMap[columnName] {
columnValidMap[i] = -1
} else {
columnValidMap[i] = validLen
validLen += 1
rsp.Columns = append(rsp.Columns, columnName)
}
}
// 生成每行的处理缓存
cache := make([]interface{}, len(columns))
for i := range cache {
var tmp interface{}
cache[i] = &tmp
}
// 遍历结果集的数据
for rows.Next() {
if err = rows.Scan(cache...); err != nil {
return
}
item := make([]interface{}, validLen)
for j, data := range cache {
if k := columnValidMap[j]; k >= 0 {
item[k] = convertFetchType(data)
}
}
rsp.Data = append(rsp.Data, item)
}
rsp.Count = int64(len(rsp.Data))
return
}
// 类型转换方法
func convertFetchType(data interface{}) interface{} {
item := *data.(*interface{})
switch item := item.(type) {
case nil: // 空值
return nil
case []uint8: // 字符串
return string(item)
case time.Time: // 时间类型
return item.Unix()
case int, int8, int16, int32, int64, float32, float64, byte: // 数字型
return item
case bool: // 布尔型
return item
default:
return item
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。