1 Star 1 Fork 5

夏季的风/数据和文件存储组件

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
TDengineClient.go 9.92 KB
一键复制 编辑 原始数据 按行查看 历史
package TDengineDB
import (
"compress/gzip"
"encoding/base64"
"fmt"
jsonitor "github.com/json-iterator/go"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
)
const (
TSDB_DATA_TYPE_NULL = 0 // 1 bytes
TSDB_DATA_TYPE_BOOL = 1 // 1 bytes
TSDB_DATA_TYPE_TINYINT = 2 // 1 byte
TSDB_DATA_TYPE_SMALLINT = 3 // 2 bytes
TSDB_DATA_TYPE_INT = 4 // 4 bytes
TSDB_DATA_TYPE_BIGINT = 5 // 8 bytes
TSDB_DATA_TYPE_FLOAT = 6 // 4 bytes
TSDB_DATA_TYPE_DOUBLE = 7 // 8 bytes
TSDB_DATA_TYPE_BINARY = 8 // string
TSDB_DATA_TYPE_TIMESTAMP = 9 // 8 bytes
TSDB_DATA_TYPE_NCHAR = 10 // unicode string
TSDB_DATA_TYPE_UTINYINT = 11 // 1 byte
TSDB_DATA_TYPE_USMALLINT = 12 // 2 bytes
TSDB_DATA_TYPE_UINT = 13 // 4 bytes
TSDB_DATA_TYPE_UBIGINT = 14 // 8 bytes
TSDB_DATA_TYPE_JSON = 15
TSDB_DATA_TYPE_VARBINARY = 16
TSDB_DATA_TYPE_DECIMAL = 17
TSDB_DATA_TYPE_BLOB = 18
TSDB_DATA_TYPE_MEDIUMBLOB = 19
TSDB_DATA_TYPE_MAX = 20
)
const (
TSDB_DATA_TYPE_NULL_Str = "NULL"
TSDB_DATA_TYPE_BOOL_Str = "BOOL"
TSDB_DATA_TYPE_TINYINT_Str = "TINYINT"
TSDB_DATA_TYPE_SMALLINT_Str = "SMALLINT"
TSDB_DATA_TYPE_INT_Str = "INT"
TSDB_DATA_TYPE_BIGINT_Str = "BIGINT"
TSDB_DATA_TYPE_FLOAT_Str = "FLOAT"
TSDB_DATA_TYPE_DOUBLE_Str = "DOUBLE"
TSDB_DATA_TYPE_BINARY_Str = "VARCHAR"
TSDB_DATA_TYPE_TIMESTAMP_Str = "TIMESTAMP"
TSDB_DATA_TYPE_NCHAR_Str = "NCHAR"
TSDB_DATA_TYPE_UTINYINT_Str = "TINYINT UNSIGNED"
TSDB_DATA_TYPE_USMALLINT_Str = "SMALLINT UNSIGNED"
TSDB_DATA_TYPE_UINT_Str = "INT UNSIGNED"
TSDB_DATA_TYPE_UBIGINT_Str = "BIGINT UNSIGNED"
TSDB_DATA_TYPE_JSON_Str = "JSON"
)
var TypeNameMap = map[int]string{
TSDB_DATA_TYPE_NULL: TSDB_DATA_TYPE_NULL_Str,
TSDB_DATA_TYPE_BOOL: TSDB_DATA_TYPE_BOOL_Str,
TSDB_DATA_TYPE_TINYINT: TSDB_DATA_TYPE_TINYINT_Str,
TSDB_DATA_TYPE_SMALLINT: TSDB_DATA_TYPE_SMALLINT_Str,
TSDB_DATA_TYPE_INT: TSDB_DATA_TYPE_INT_Str,
TSDB_DATA_TYPE_BIGINT: TSDB_DATA_TYPE_BIGINT_Str,
TSDB_DATA_TYPE_FLOAT: TSDB_DATA_TYPE_FLOAT_Str,
TSDB_DATA_TYPE_DOUBLE: TSDB_DATA_TYPE_DOUBLE_Str,
TSDB_DATA_TYPE_BINARY: TSDB_DATA_TYPE_BINARY_Str,
TSDB_DATA_TYPE_TIMESTAMP: TSDB_DATA_TYPE_TIMESTAMP_Str,
TSDB_DATA_TYPE_NCHAR: TSDB_DATA_TYPE_NCHAR_Str,
TSDB_DATA_TYPE_UTINYINT: TSDB_DATA_TYPE_UTINYINT_Str,
TSDB_DATA_TYPE_USMALLINT: TSDB_DATA_TYPE_USMALLINT_Str,
TSDB_DATA_TYPE_UINT: TSDB_DATA_TYPE_UINT_Str,
TSDB_DATA_TYPE_UBIGINT: TSDB_DATA_TYPE_UBIGINT_Str,
TSDB_DATA_TYPE_JSON: TSDB_DATA_TYPE_JSON_Str,
}
var NameTypeMap = map[string]int{
TSDB_DATA_TYPE_NULL_Str: TSDB_DATA_TYPE_NULL,
TSDB_DATA_TYPE_BOOL_Str: TSDB_DATA_TYPE_BOOL,
TSDB_DATA_TYPE_TINYINT_Str: TSDB_DATA_TYPE_TINYINT,
TSDB_DATA_TYPE_SMALLINT_Str: TSDB_DATA_TYPE_SMALLINT,
TSDB_DATA_TYPE_INT_Str: TSDB_DATA_TYPE_INT,
TSDB_DATA_TYPE_BIGINT_Str: TSDB_DATA_TYPE_BIGINT,
TSDB_DATA_TYPE_FLOAT_Str: TSDB_DATA_TYPE_FLOAT,
TSDB_DATA_TYPE_DOUBLE_Str: TSDB_DATA_TYPE_DOUBLE,
TSDB_DATA_TYPE_BINARY_Str: TSDB_DATA_TYPE_BINARY,
TSDB_DATA_TYPE_TIMESTAMP_Str: TSDB_DATA_TYPE_TIMESTAMP,
TSDB_DATA_TYPE_NCHAR_Str: TSDB_DATA_TYPE_NCHAR,
TSDB_DATA_TYPE_UTINYINT_Str: TSDB_DATA_TYPE_UTINYINT,
TSDB_DATA_TYPE_USMALLINT_Str: TSDB_DATA_TYPE_USMALLINT,
TSDB_DATA_TYPE_UINT_Str: TSDB_DATA_TYPE_UINT,
TSDB_DATA_TYPE_UBIGINT_Str: TSDB_DATA_TYPE_UBIGINT,
TSDB_DATA_TYPE_JSON_Str: TSDB_DATA_TYPE_JSON,
}
var jsonI = jsonitor.ConfigCompatibleWithStandardLibrary
//TDengineClient 数据库客户端 http://192.168.1.215:6041/rest/sql
type TDengineClient struct {
url *url.URL //数据库地址
header map[string][]string //请求头
httpClient *http.Client //客户端
timeFormat string //时间格式
maxRetryCount int //重试次数
}
//NewTDengineClient 初始化客户端
func NewTDengineClient(ipport string,user string,password string,timeout time.Duration,retryCount int) *TDengineClient {
//请求组织
client := &TDengineClient{
maxRetryCount: retryCount,
httpClient: &http.Client{
Timeout: timeout,
},
}
client.url = &url.URL{
Scheme: "http",
Host: ipport,
Path: "/rest/sql",
}
basic := base64.StdEncoding.EncodeToString([]byte(fmt.Sprint(user, ":", password)))
client.header = map[string][]string{
"Connection": {"keep-alive"},
"Accept-Encoding": {"gzip"},
"Authorization": {fmt.Sprintf("Basic %s", basic)},
}
return client
}
//Execute 执行SQL
//sqlCmd sql语句
func (c *TDengineClient) Execute(sqlCmd string) (*ExecuteResult,error) {
retryCount := 0
retry:
reqBody := ioutil.NopCloser(strings.NewReader(sqlCmd))
request := &http.Request{
Method: http.MethodPost,
URL: c.url,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: c.header,
Body: reqBody,
Host: c.url.Host,
}
response, err := c.httpClient.Do(request)
if err != nil {
if urlErr, ok := err.(*url.Error); ok {
if urlErr.Timeout() {
retryCount++
if retryCount <= c.maxRetryCount {
time.Sleep(time.Microsecond) //休眠放弃cpu,等待下一次时间片
goto retry
}
}
}
return nil, err
}
defer response.Body.Close()
//请求状态
if response.StatusCode != http.StatusOK {
if response.StatusCode == http.StatusGatewayTimeout || response.StatusCode == http.StatusRequestTimeout {
retryCount++
if retryCount <= c.maxRetryCount {
time.Sleep(time.Microsecond) //休眠放弃cpu,等待下一次时间片
goto retry
}
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("Read body err: %s - %s - %s ", response.Status, err, string(body))
}
return nil, fmt.Errorf("server response: %s - %s ", response.Status, string(body))
}
respBody := response.Body
//解压
if response.Header.Get("Content-Encoding") == "gzip" {
respBody, err = gzip.NewReader(respBody)
if err != nil {
return nil, err
}
}
bodyByte, err := ioutil.ReadAll(respBody)
if err != nil {
return nil, fmt.Errorf("Read body err: %s - %s ", err, string(bodyByte))
}
result, err := c.marshalBody(bodyByte)
if err != nil {
return nil, fmt.Errorf("marshal body err: %s - %s ", err, string(bodyByte))
}
return result, nil
}
var(
//时间格式 v3.0 使用 time.RFC3339Nano 格式
timeFormats = []string{ time.RFC3339Nano,"2006-01-02 15:04:05.999999999"}
)
//marshalBody 反序列化 jsonStr 到 对象
func (c *TDengineClient) marshalBody(body []byte) (*ExecuteResult, error) {
var result ExecuteResult
iter := jsonI.BorrowIterator(body)
defer jsonI.ReturnIterator(iter)
iter.ReadObjectCB(func(iter *jsonitor.Iterator, s string) bool {
switch s {
case "code":
result.Code = iter.ReadInt()
case "desc":
result.Desc = iter.ReadString()
case "column_meta":
iter.ReadArrayCB(func(iter *jsonitor.Iterator) bool {
index := 0
iter.ReadArrayCB(func(iter *jsonitor.Iterator) bool {
switch index {
case 0:
result.ColNames = append(result.ColNames, iter.ReadString())
index = 1
case 1:
valueType := iter.WhatIsNext()
if valueType == jsonitor.NumberValue { //v2.0.17
if c.timeFormat == "" {
c.timeFormat = timeFormats[1] //时间格式
}
result.ColTypes = append(result.ColTypes, iter.ReadInt())
} else { //v3.0
if c.timeFormat == "" {
c.timeFormat = timeFormats[0] //时间格式
}
typeStr := iter.ReadString()
t, exist := NameTypeMap[typeStr]
if exist {
result.ColTypes = append(result.ColTypes, t)
} else {
iter.ReportError("unsupported type in column_meta", typeStr)
}
}
index = 2
case 2:
result.ColLength = append(result.ColLength, iter.ReadInt64())
index = 0
}
return true
})
return true
})
case "data":
columnCount := len(result.ColTypes)
column := 0
iter.ReadArrayCB(func(iter *jsonitor.Iterator) bool {
column = 0
var row = make([]interface{}, columnCount)
iter.ReadArrayCB(func(iter *jsonitor.Iterator) bool {
defer func() {
column += 1
}()
columnType := result.ColTypes[column]
if columnType == TSDB_DATA_TYPE_JSON {
row[column] = iter.SkipAndReturnBytes()
return true
}
if iter.ReadNil() {
row[column] = nil
return true
}
var err error
switch columnType {
case TSDB_DATA_TYPE_NULL:
iter.Skip()
row[column] = nil
case TSDB_DATA_TYPE_BOOL:
row[column] = iter.ReadAny().ToBool()
case TSDB_DATA_TYPE_TINYINT:
row[column] = iter.ReadInt8()
case TSDB_DATA_TYPE_SMALLINT:
row[column] = iter.ReadInt16()
case TSDB_DATA_TYPE_INT:
row[column] = iter.ReadInt32()
case TSDB_DATA_TYPE_BIGINT:
row[column] = iter.ReadInt64()
case TSDB_DATA_TYPE_FLOAT:
row[column] = iter.ReadFloat32()
case TSDB_DATA_TYPE_DOUBLE:
row[column] = iter.ReadFloat64()
case TSDB_DATA_TYPE_BINARY:
row[column] = iter.ReadString()
case TSDB_DATA_TYPE_TIMESTAMP:
b := iter.ReadString()
row[column], err = time.Parse(c.timeFormat, b)
if err != nil {
iter.ReportError("parse time", err.Error())
}
case TSDB_DATA_TYPE_NCHAR:
row[column] = iter.ReadString()
case TSDB_DATA_TYPE_UTINYINT:
row[column] = iter.ReadUint8()
case TSDB_DATA_TYPE_USMALLINT:
row[column] = iter.ReadUint16()
case TSDB_DATA_TYPE_UINT:
row[column] = iter.ReadUint32()
case TSDB_DATA_TYPE_UBIGINT:
row[column] = iter.ReadUint64()
default:
row[column] = nil
iter.Skip()
}
return iter.Error == nil
})
if iter.Error != nil {
return false
}
result.Data = append(result.Data, row)
return true
})
case "rows":
result.Rows = iter.ReadInt()
default:
iter.Skip()
}
return iter.Error == nil
})
if iter.Error != nil && iter.Error != io.EOF {
return nil, iter.Error
}
return &result, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ling-bin/repository.git
git@gitee.com:ling-bin/repository.git
ling-bin
repository
数据和文件存储组件
v1.6.22

搜索帮助