1 Star 1 Fork 5

夏季的风/数据和文件存储组件

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
TDengineRepository.go 7.51 KB
一键复制 编辑 原始数据 按行查看 历史
package TDengineDB
import (
"bytes"
"errors"
"strconv"
"strings"
"unsafe"
)
// TDengineRepository 数据库存储
type TDengineRepository struct {
database string //数据库名称
superTableName string //超级表名称(创建超级表用)
client *TDengineClient //客户端
}
//NewTDengineRepository 数据库存储
func NewTDengineRepository(database string,superTableName string,client *TDengineClient) ITDengineRepository {
repository := &TDengineRepository{
database: database,
superTableName: superTableName,
client: client,
}
return repository
}
// GetDatabase 获取数据库名
func (t *TDengineRepository) GetDatabase() string {
return t.database
}
// GetSuperTableName 获取表名称
func (t *TDengineRepository) GetSuperTableName() string{
return t.superTableName
}
//Add 插入数据(单个)对单个表操作(不能存储带有特殊字符内容如:\n \r \...)
//tableName:表名称 value:1,“A”,123
func (t *TDengineRepository) Add(tableName, value,tags string) (int64,error) {
return t.AddMany(map[string]*InsertData{
tableName: {
Tags: tags,
Data: []string{value},
},
})
}
var (
maxLen = 256 * 1024 //sql长度
maxTableCount = 1000 //单条sql包含的表个数
)
//AddMany 插入数据(单个或批量)对单个表操作(不能存储带有特殊字符内容如:\n \r \...) show tables like 't_%'
//map[ key:表名称,value:{"Tags":"tag0,tag1...",data:["\"1538548695000, 12.6, 218, 0.33\",""\"1538548695000, 12.6, 218, 0.33\"...]}} ] values 最大长度不能超过 数据库配置值(数据库默认大小62k)
//INSERT INTO d1001 USING METERS TAGS ("Beijng.Chaoyang", 2) VALUES (now, 10.2, 219, 0.32);
//INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) d1002 VALUES (1538548696800, 12.3, 221, 0.31);
//CREATE TABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupdId int);
func (t *TDengineRepository) AddMany(dataDict map[string]*InsertData) (int64,error) {
bt := bytes.Buffer{}
count := int64(0)
isStart := false
/*预定义容量*/
preLen := 0
for key, val := range dataDict {
preLen += len(val.Tags) + len(t.superTableName) + len(key) + len(t.database) * 2 + 50
for _, row := range val.Data {
preLen += len(row) + 4
}
}
if preLen >= maxLen {
preLen = maxLen + 1024
}
bt.Grow(preLen) //容量预分配减少内存拷贝
tableCount := 0
for key, val := range dataDict {
if !isStart {
bt.WriteString("INSERT INTO ")
isStart = true
}
bt.WriteString(t.database)
bt.WriteString(".")
bt.WriteString(key)
bt.WriteString(" USING ")
bt.WriteString(t.database)
bt.WriteString(".")
bt.WriteString(t.superTableName)
bt.WriteString(" TAGS (")
bt.WriteString(val.Tags)
bt.WriteString(")")
bt.WriteString(" VALUES ")
tableCount++
for _, row := range val.Data {
if !isStart {
bt.WriteString("INSERT INTO ")
isStart = true
bt.WriteString(t.database)
bt.WriteString(".")
bt.WriteString(key)
bt.WriteString(" USING ")
bt.WriteString(t.database)
bt.WriteString(".")
bt.WriteString(t.superTableName)
bt.WriteString(" TAGS (")
bt.WriteString(val.Tags)
bt.WriteString(")")
bt.WriteString(" VALUES ")
}
bt.WriteString("(")
bt.WriteString(row)
bt.WriteString(") ")
if bt.Len() >= maxLen || tableCount >= maxTableCount {
bt.WriteString(";")
bts := bt.Bytes()
sql := *(*string)(unsafe.Pointer(&bts))
result, err := t.Execute(sql)
if err != nil {
return count, err
}
if rowCount, err := result.GetAffectedRows(); err == nil {
count += rowCount
}
tableCount = 0
isStart = false
bt.Reset() //重置
}
}
}
if bt.Len() > 0 {
bt.WriteString(";")
bts := bt.Bytes()
sql := *(*string)(unsafe.Pointer(&bts))
result, err := t.Execute(sql)
if err != nil {
return count, err
}
if rowCount, err := result.GetAffectedRows(); err == nil {
count += rowCount
}
tableCount = 0
bt.Reset() //重置
}
return count, nil
}
//Execute 执行[直接执行sql得影响行数](不能存储带有特殊字符内容如:\n \r \...)
func (t *TDengineRepository) Execute(sqlCmd string) (*ExecuteResult,error) {
return t.client.Execute(sqlCmd)
}
// QuerySuperTop 根据条件查询超级中的数据
// columns:列名,...(*或""表示全部)
// queryWhere:条件(a='123');
// topCount: 排序后前多少条;
// sort排序(只支持时间): 0 => 默认,1 => order by ts desc, 2 => order by ts;
func (t *TDengineRepository) QuerySuperTop(columns,queryWhere string,topCount ,sort int) (result *ExecuteResult,err error) {
result, _, err = t.QuerySuper(columns, queryWhere, 1, topCount, sort, false)
return result, err
}
// QuerySuperPage 根据条件查询超级中的数据
// columns:列名,...(*或""表示全部);
// queryWhere:条件(a='123');
// pageIndex:页数从1开始;
// pageSize:页条数
// sort排序(只支持时间): 0 => 默认,1 => order by ts desc, 2 => order by ts;
func (t *TDengineRepository) QuerySuperPage(columns,queryWhere string,pageIndex,pageSize,sort int) (result *ExecuteResult,totalCount int64, err error) {
return t.QuerySuper(columns, queryWhere, pageIndex, pageSize, sort, true)
}
// QueryCount 查询总个数
// queryWhere:条件(a='123')
func (t *TDengineRepository) QueryCount(queryWhere string) (int64,error) {
if queryWhere == "" {
return 0, errors.New(" queryWhere Not Null!")
}
bt := strings.Builder{}
preLen := len(t.database) + len(t.superTableName) + len(queryWhere) + 50
bt.Grow(preLen)
bt.WriteString("select count(ts) from ")
bt.WriteString(t.database)
bt.WriteString(".")
bt.WriteString(t.superTableName)
bt.WriteString(" where ")
bt.WriteString(queryWhere)
bt.WriteString(";")
sql := bt.String()
result, err := t.Execute(sql)
if err != nil{
return -1, err
}
return result.GetAffectedRows()
}
// QuerySuper 根据条件查询超级中的数据
// columns:列名,...(*或""表示全部);
// queryWhere:条件(a='123');
// pageIndex:页数从1开始;
// pageSize:页条数
// sort排序(只支持时间): 0 => 默认,1 => order by ts desc, 2 => order by ts;
// isTotalCount:是否需要总数
func (t *TDengineRepository) QuerySuper(columns,queryWhere string,pageIndex,pageSize,sort int,isTotalCount bool) (result *ExecuteResult,totalCount int64, err error) {
if queryWhere == "" {
return nil, 0, errors.New(" queryWhere Not Null!")
}
if isTotalCount {
count,err := t.QueryCount(queryWhere)
if err == nil {
totalCount = count
}else {
totalCount = -1
}
} else {
totalCount = -1
}
/*查数据*/
sqlCmd := strings.Builder{}
preLen := len(t.database) + len(t.superTableName) + len(queryWhere) + len(columns) + 80
sqlCmd.Grow(preLen)
sqlCmd.WriteString("select ")
if columns == "" {
sqlCmd.WriteString(" * ")
} else {
sqlCmd.WriteString(columns)
sqlCmd.WriteString(" ")
}
sqlCmd.WriteString(" from ")
sqlCmd.WriteString(t.database)
sqlCmd.WriteString(".")
sqlCmd.WriteString(t.superTableName)
sqlCmd.WriteString(" where ")
sqlCmd.WriteString(queryWhere)
sqlCmd.WriteString(" ")
if sort != 0 {
if sort == 1 {
sqlCmd.WriteString("order by ts desc")
}else {
sqlCmd.WriteString("order by ts")
}
sqlCmd.WriteString(" ")
}
sqlCmd.WriteString(" LIMIT ")
sqlCmd.WriteString(strconv.Itoa(pageSize))
sqlCmd.WriteString(" OFFSET ")
sqlCmd.WriteString(strconv.Itoa((pageIndex - 1) * pageSize))
sqlCmd.WriteString(";")
sql := sqlCmd.String()
res, err := t.Execute(sql)
if err != nil {
return nil, totalCount, err
}
return res, totalCount, err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ling-bin/repository.git
git@gitee.com:ling-bin/repository.git
ling-bin
repository
数据和文件存储组件
v1.6.22

搜索帮助