4 Star 6 Fork 3

王军 / golib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ck_group.go 4.23 KB
一键复制 编辑 原始数据 按行查看 历史
王军 提交于 2023-07-14 15:53 . 写入ck增加分组写入
/*
* @Author: Wangjun
* @Date: 2021-07-12 16:25:12
* @LastEditTime: 2023-07-14 13:50:51
* @LastEditors: Wangjun
* @Description:clickhouse group组
* @FilePath: \sourcedatad:\go\src\gitee.com\haodreams\golib\ck\ck_group.go
* hnxr
*/
package ck
import (
"database/sql"
"fmt"
"strings"
"time"
"gitee.com/haodreams/godriver/clickhouse"
"gitee.com/haodreams/golib/logs"
"gorm.io/gorm"
"gorm.io/gorm/schema"
)
type CKGroup struct {
dbs []*gorm.DB
}
/**
* @description: 获取数据库DB
* @param {*}
* @return {*}
*/
func (m *CKGroup) GetDB() *gorm.DB {
if len(m.dbs) > 0 {
return m.dbs[0]
}
return nil
}
/**
* @description: 创建一个group数组
* @param {string} hosts
* @param {string} dsn
* @return {*}
*/
func NewCKGroup(host string, datasource string, tablePrefixs ...string) (g *CKGroup) {
hosts := strings.Split(host, ",")
g = new(CKGroup)
tablePrefix := ""
if len(tablePrefixs) > 0 {
tablePrefix = tablePrefixs[0]
}
for i := range hosts {
hosts[i] = strings.TrimSpace(hosts[i])
dsn := fmt.Sprintf(datasource, hosts[i])
db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
TablePrefix: tablePrefix,
},
})
if err != nil {
logs.Error(err)
continue
}
d, err := db.DB()
if err != nil {
logs.Error(err)
continue
}
d.SetMaxIdleConns(1)
d.SetMaxOpenConns(5)
d.SetConnMaxLifetime(time.Second * 3600)
d.SetConnMaxIdleTime(time.Second * 900)
g.dbs = append(g.dbs, db)
}
if len(g.dbs) == 0 {
return nil
}
return
}
func (m *CKGroup) exec(sql string) (err error) {
for i, db := range m.dbs {
er := db.Exec(sql).Error
if er != nil {
logs.Warn(i+1, er.Error())
if err == nil {
err = er
}
continue
}
}
return
}
/**
* @description: 写入数据到clickhouse
* @param {int} dbIdx 使用第几个DB
* @param {interface{}} rows
* @return {*}
*/
func (m *CKGroup) insert(dbIdx int, sql string, rows [][]interface{}) (count int, err error) {
gdb := m.dbs[dbIdx]
db, err := gdb.DB()
if err != nil {
return
}
n := len(rows)
i := 0
const max = 5000
cnt := 0
for ; i < n; i += max {
if (i + max) <= n {
cnt, err = m.insertMax5000(db, sql, rows[i:i+max])
if err != nil {
return
}
count += cnt
} else {
cnt, err = m.insertMax5000(db, sql, rows[i:])
if err != nil {
return
}
count += cnt
}
}
// tx, err := db.Begin()
// if err != nil {
// logs.Error(sql, err)
// return
// }
// stmt, err := tx.Prepare(sql)
// if err != nil {
// tx.Rollback()
// logs.Error(err, sql)
// return
// }
// defer stmt.Close()
// for i := range rows {
// _, err = stmt.Exec(rows[i]...)
// if err != nil {
// logs.Warn(sql, "exec insert ", err)
// continue
// }
// count++
// }
// //logs.Infof("%s connect parser to sql:%d used time:%s", msg, len(vals), time.Since(now))
// if err := tx.Commit(); err != nil {
// logs.Error(err)
// }
return
}
func (m *CKGroup) insertMax5000(db *sql.DB, sql string, rows [][]interface{}) (count int, err error) {
tx, err := db.Begin()
if err != nil {
logs.Error(sql, err)
return
}
stmt, err := tx.Prepare(sql)
if err != nil {
tx.Rollback()
logs.Error(err, sql)
return
}
defer stmt.Close()
for i := range rows {
_, err = stmt.Exec(rows[i]...)
if err != nil {
logs.Warn(sql, "exec insert ", err)
continue
}
count++
}
//logs.Infof("%s connect parser to sql:%d used time:%s", msg, len(vals), time.Since(now))
if err := tx.Commit(); err != nil {
logs.Error(err)
}
return
}
func (m *CKGroup) Find(dbIdx int, dest interface{}, conds ...interface{}) (err error) {
db := m.dbs[dbIdx]
err = db.Find(dest, conds...).Error
return
}
func (m *CKGroup) RawScan(dbIdx int, dest interface{}, sql string, values ...interface{}) (err error) {
db := m.dbs[dbIdx]
err = db.Raw(sql, values...).Scan(dest).Error
return
}
func (m *CKGroup) RawFind(dbIdx int, dest interface{}, sql string, values ...interface{}) (err error) {
db := m.dbs[dbIdx]
err = db.Raw(sql, values...).Find(dest).Error
return
}
Go
1
https://gitee.com/haodreams/golib.git
git@gitee.com:haodreams/golib.git
haodreams
golib
golib
212a885c2c3f

搜索帮助

53164aa7 5694891 3bd8fe86 5694891