4 Star 6 Fork 3

王军 / golib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ck_groups.go 7.51 KB
一键复制 编辑 原始数据 按行查看 历史
fuzhuang 提交于 2023-12-11 09:15 . 1
/*
* @Author: Wangjun
* @Date: 2021-07-12 16:36:13
* @LastEditTime: 2023-11-21 14:05:51
* @LastEditors: fuzhuang
* @Description:clickhouse group数组
* @FilePath: \xr_node_calcd:\go\src\gitee.com\haodreams\golib\ck\ck_groups.go
* hnxr
*/
package ck
import (
"errors"
"strings"
"sync/atomic"
"gitee.com/haodreams/golib/gensql"
"gitee.com/haodreams/golib/logs"
"gitee.com/haodreams/golib/wrap"
"gitee.com/haodreams/libs/config"
"gitee.com/haodreams/libs/ee"
"gorm.io/gorm"
)
type CKGroups struct {
groups []*CKGroup
Num int //group 分库数
ckPos uint32
enable bool
}
func (m *CKGroups) getIdx() int {
i := atomic.AddUint32(&m.ckPos, 1)
i = uint32(int(i) % m.Num)
return int(i)
}
// Setup 初始化配置
func (m *CKGroups) Setup(conf config.Configer) (err error) {
err = m.Init(conf)
if err != nil {
return ee.New(err)
}
return
}
/**
* @description: host11,host12;host21,host22 ,group用分号分隔,host用逗号分隔
* @param {config.Configer} conf
* @return {*}
*/
func (m *CKGroups) Init(conf config.Configer) (err error) {
m.enable = conf.DefaultBool("ck_enable", false)
if !m.enable {
return
}
host := conf.String("ck_group")
if host == "" {
err = errors.New("没有配置clickhouse数据源服务器,关闭使用clickhouse功能")
return
}
datasource := conf.String("ck_dsn")
if datasource == "" {
err = errors.New("没有配置clickhouse DSN")
return
}
tablePrefix := conf.String("ck_table_prefix")
hosts := strings.Split(host, ";")
for i := range hosts {
g := NewCKGroup(hosts[i], datasource, tablePrefix)
if g != nil {
m.groups = append(m.groups, g)
}
}
if len(m.groups) == 0 {
err = errors.New("没有有效的clickhouse group")
return
}
m.Num = len(m.groups[0].dbs)
for _, g := range m.groups {
if m.Num > len(g.dbs) {
m.Num = len(g.dbs)
}
}
if m.Num == 0 {
err = errors.New("没有配置clickhouse地址")
}
return
}
/**
* @description: host11,host12;host21,host22 ,group用分号分隔,host用逗号分隔
* @param {config.Configer} conf
* @return {*}
*/
func (m *CKGroups) InitArray(data []string) (ck *CKGroups, err error) {
host := data[1]
if host == "" {
err = errors.New("没有配置clickhouse数据源服务器,关闭使用clickhouse功能")
return
}
datasource := data[2]
if datasource == "" {
err = errors.New("没有配置clickhouse DSN")
return
}
hosts := strings.Split(host, ";")
for i := range hosts {
g := NewCKGroup(hosts[i], datasource)
if g != nil {
m.groups = append(m.groups, g)
}
}
if len(m.groups) == 0 {
err = errors.New("没有有效的clickhouse group")
return
}
m.Num = len(m.groups[0].dbs)
for _, g := range m.groups {
if m.Num > len(g.dbs) {
m.Num = len(g.dbs)
}
}
if m.Num == 0 {
err = errors.New("没有配置clickhouse地址")
}
return m, err
}
/**
* @description: 获取组数量
* @param {*}
* @return {*}
*/
func (m *CKGroups) GetGroupCount() int {
return len(m.groups)
}
/**
* @description: 并发写入clickhouse数据
* @param {interface{}} rows 对象数组
* 不判断enable
* @return {*}
*/
func (m *CKGroups) InsertNoEnable(sql string, rows [][]interface{}) (count int, validGroup int64, err error) {
idx := m.getIdx()
num := len(m.groups)
//now := time.Now()
// defer func() {
// logs.Info("group num:", num, "valild group", validGroup, "insert num:", count, "used time:", time.Since(now))
// }()
if num == 1 {
count, err = m.groups[0].insert(idx, sql, rows)
if err != nil {
logs.Warn(err)
} else {
validGroup = 1
}
return
}
//并发执行
wg := wrap.NewWaitGroupWrapper(num)
for i := range m.groups {
//重新赋值防止闭包的引用错误
group := m.groups[i]
wg.Wrap(func() {
var er error
count, er = group.insert(idx, sql, rows)
if er != nil {
err = er
logs.Warn(err)
} else {
atomic.AddInt64(&validGroup, 1)
}
})
}
wg.Wait()
return
}
/**
* @description: 并发写入clickhouse数据
* @param {interface{}} rows 对象数组
* @return {*}
*/
func (m *CKGroups) Insert(sql string, rows [][]interface{}) (count int, validGroup int64, err error) {
if !m.enable {
return
}
idx := m.getIdx()
num := len(m.groups)
//now := time.Now()
// defer func() {
// logs.Info("group num:", num, "valild group", validGroup, "insert num:", count, "used time:", time.Since(now))
// }()
if num == 1 {
count, err = m.groups[0].insert(idx, sql, rows)
if err != nil {
logs.Warn(err)
} else {
validGroup = 1
}
return
}
//并发执行
wg := wrap.NewWaitGroupWrapper(num)
for i := range m.groups {
//重新赋值防止闭包的引用错误
group := m.groups[i]
wg.Wrap(func() {
var er error
count, er = group.insert(idx, sql, rows)
if er != nil {
err = er
logs.Warn(err)
} else {
atomic.AddInt64(&validGroup, 1)
}
})
}
wg.Wait()
return
}
/**
* @description: 查找全部
* @param {interface{}} dest 数组地址
* @param {...interface{}} conds
* @return {*}
*/
func (m *CKGroups) Find(dest interface{}, conds ...interface{}) (err error) {
if !m.enable {
return
}
idx := m.getIdx()
return m.groups[0].Find(idx, dest, conds...)
}
/**
* @description: 使用原生sql 查找一行
* @param {interface{}} dest
* @param {string} sql
* @param {...interface{}} values
* @return {*}
*/
func (m *CKGroups) RawScan(dest interface{}, sql string, values ...interface{}) (err error) {
if !m.enable {
return
}
idx := m.getIdx()
return m.groups[0].RawScan(idx, dest, sql, values...)
}
/**
* @description: 使用原生sql 查找全部
* @param {interface{}} dest
* @param {string} sql
* @param {...interface{}} values
* @return {*}
*/
func (m *CKGroups) RawFind(dest interface{}, sql string, values ...interface{}) (err error) {
if !m.enable {
return
}
idx := m.getIdx()
return m.groups[0].RawFind(idx, dest, sql, values...)
}
func (m *CKGroups) GetDB() (db *gorm.DB) {
if len(m.groups) > 0 {
return m.groups[0].GetDB()
}
return nil
}
/*
*公用方法-写入clickhouse数据库
*param(clickhouse连接,json数组)
*return(变化行数,错误信息)
*/
func (m *CKGroups) InsertObjects(rows interface{}) (count int, validGroup int64, err error) {
if !m.enable {
return
}
db := m.GetDB()
if db == nil {
err = errors.New("没有有效的数据库对象")
return
}
sql, records, err := gensql.GenSQLData(db, rows)
if err != nil {
return
}
if len(records) == 0 {
return
}
return m.Insert(sql, records)
}
/*
*公用方法-写入clickhouse数据库
*不判断enable
*param(clickhouse连接,json数组)
*return(变化行数,错误信息)
*/
func (m *CKGroups) InsertObjectsNoEnable(rows interface{}) (count int, validGroup int64, err error) {
db := m.GetDB()
if db == nil {
err = errors.New("没有有效的数据库对象")
return
}
sql, records, err := gensql.GenSQLData(db, rows)
if err != nil {
return
}
if len(records) == 0 {
return
}
return m.InsertNoEnable(sql, records)
}
/**
* @description: 每个主机上都执行sql命令
* @param {string} sql
* @return {*}
*/
func (m *CKGroups) Exec(sql string) (err error) {
if !m.enable {
return
}
for i, group := range m.groups {
er := group.exec(sql)
if er != nil {
logs.Error("group:", i+1, er)
if err == nil {
err = er
}
continue
}
}
return
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/haodreams/golib.git
git@gitee.com:haodreams/golib.git
haodreams
golib
golib
c0734b0c2841

搜索帮助

344bd9b3 5694891 D2dac590 5694891