1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
influxdb.go 16.98 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
package client
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
)
const (
UDPMaxMessageSize = 2048
)
type Client struct {
host string
username string
password string
database string
httpClient *http.Client
udpConn *net.UDPConn
schema string
compression bool
}
type ClientConfig struct {
Host string
Username string
Password string
Database string
HttpClient *http.Client
IsSecure bool
IsUDP bool
}
var defaults *ClientConfig
func init() {
defaults = &ClientConfig{
Host: "localhost:8086",
Username: "root",
Password: "root",
Database: "",
HttpClient: http.DefaultClient,
}
}
func getDefault(value, defaultValue string) string {
if value == "" {
return defaultValue
}
return value
}
func New(config *ClientConfig) (*Client, error) {
return NewClient(config)
}
func NewClient(config *ClientConfig) (*Client, error) {
host := getDefault(config.Host, defaults.Host)
username := getDefault(config.Username, defaults.Username)
password := getDefault(config.Password, defaults.Password)
database := getDefault(config.Database, defaults.Database)
if config.HttpClient == nil {
config.HttpClient = defaults.HttpClient
}
var udpConn *net.UDPConn
if config.IsUDP {
serverAddr, err := net.ResolveUDPAddr("udp", host)
if err != nil {
return nil, err
}
udpConn, err = net.DialUDP("udp", nil, serverAddr)
if err != nil {
return nil, err
}
}
schema := "http"
if config.IsSecure {
schema = "https"
}
return &Client{host, username, password, database, config.HttpClient, udpConn, schema, false}, nil
}
func (self *Client) DisableCompression() {
self.compression = false
}
func (self *Client) getUrl(path string) string {
return self.getUrlWithUserAndPass(path, self.username, self.password)
}
func (self *Client) getUrlWithUserAndPass(path, username, password string) string {
return fmt.Sprintf("%s://%s%s?u=%s&p=%s", self.schema, self.host, path, username, password)
}
func responseToError(response *http.Response, err error, closeResponse bool) error {
if err != nil {
return err
}
if closeResponse {
defer response.Body.Close()
}
if response.StatusCode >= 200 && response.StatusCode < 300 {
return nil
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
return fmt.Errorf("Server returned (%d): %s", response.StatusCode, string(body))
}
func (self *Client) CreateDatabase(name string) error {
url := self.getUrl("/db")
payload := map[string]string{"name": name}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) del(url string) (*http.Response, error) {
return self.delWithBody(url, nil)
}
func (self *Client) delWithBody(url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("DELETE", url, body)
if err != nil {
return nil, err
}
return self.httpClient.Do(req)
}
func (self *Client) DeleteDatabase(name string) error {
url := self.getUrl("/db/" + name)
resp, err := self.del(url)
return responseToError(resp, err, true)
}
func (self *Client) get(url string) ([]byte, error) {
resp, err := self.httpClient.Get(url)
err = responseToError(resp, err, false)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
return body, err
}
func (self *Client) getWithVersion(url string) ([]byte, string, error) {
resp, err := self.httpClient.Get(url)
err = responseToError(resp, err, false)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
version := resp.Header.Get("X-Influxdb-Version")
fields := strings.Fields(version)
if len(fields) > 2 {
return body, fields[1], err
}
return body, "", err
}
func (self *Client) listSomething(url string) ([]map[string]interface{}, error) {
body, err := self.get(url)
if err != nil {
return nil, err
}
somethings := []map[string]interface{}{}
err = json.Unmarshal(body, &somethings)
if err != nil {
return nil, err
}
return somethings, nil
}
func (self *Client) GetDatabaseList() ([]map[string]interface{}, error) {
url := self.getUrl("/db")
return self.listSomething(url)
}
func (self *Client) CreateClusterAdmin(name, password string) error {
url := self.getUrl("/cluster_admins")
payload := map[string]string{"name": name, "password": password}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) UpdateClusterAdmin(name, password string) error {
url := self.getUrl("/cluster_admins/" + name)
payload := map[string]string{"password": password}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) DeleteClusterAdmin(name string) error {
url := self.getUrl("/cluster_admins/" + name)
resp, err := self.del(url)
return responseToError(resp, err, true)
}
func (self *Client) GetClusterAdminList() ([]map[string]interface{}, error) {
url := self.getUrl("/cluster_admins")
return self.listSomething(url)
}
func (self *Client) Servers() ([]map[string]interface{}, error) {
url := self.getUrl("/cluster/servers")
return self.listSomething(url)
}
func (self *Client) RemoveServer(id int) error {
resp, err := self.del(self.getUrl(fmt.Sprintf("/cluster/servers/%d", id)))
return responseToError(resp, err, true)
}
// Creates a new database user for the given database. permissions can
// be omitted in which case the user will be able to read and write to
// all time series. If provided, there should be two strings, the
// first for read and the second for write. The strings are regexes
// that are used to match the time series name to determine whether
// the user has the ability to read/write to the given time series.
//
// client.CreateDatabaseUser("db", "user", "pass")
// // the following user cannot read from any series and can write
// // to the limited time series only
// client.CreateDatabaseUser("db", "limited", "pass", "^$", "limited")
func (self *Client) CreateDatabaseUser(database, name, password string, permissions ...string) error {
readMatcher, writeMatcher := ".*", ".*"
switch len(permissions) {
case 0:
case 2:
readMatcher, writeMatcher = permissions[0], permissions[1]
default:
return fmt.Errorf("You have to provide two ")
}
url := self.getUrl("/db/" + database + "/users")
payload := map[string]string{"name": name, "password": password, "readFrom": readMatcher, "writeTo": writeMatcher}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
// Change the cluster admin password
func (self *Client) ChangeClusterAdminPassword(name, newPassword string) error {
url := self.getUrl("/cluster_admins/" + name)
payload := map[string]interface{}{"password": newPassword}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
// Change the user password, adming flag and optionally permissions
func (self *Client) ChangeDatabaseUser(database, name, newPassword string, isAdmin bool, newPermissions ...string) error {
switch len(newPermissions) {
case 0, 2:
default:
return fmt.Errorf("You have to provide two ")
}
url := self.getUrl("/db/" + database + "/users/" + name)
payload := map[string]interface{}{"password": newPassword, "admin": isAdmin}
if len(newPermissions) == 2 {
payload["readFrom"] = newPermissions[0]
payload["writeTo"] = newPermissions[1]
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
// See Client.CreateDatabaseUser for more info on the permissions
// argument
func (self *Client) updateDatabaseUserCommon(database, name string, password *string, isAdmin *bool, permissions ...string) error {
url := self.getUrl("/db/" + database + "/users/" + name)
payload := map[string]interface{}{}
if password != nil {
payload["password"] = *password
}
if isAdmin != nil {
payload["admin"] = *isAdmin
}
switch len(permissions) {
case 0:
case 2:
payload["readFrom"] = permissions[0]
payload["writeTo"] = permissions[1]
default:
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) UpdateDatabaseUser(database, name, password string) error {
return self.updateDatabaseUserCommon(database, name, &password, nil)
}
func (self *Client) UpdateDatabaseUserPermissions(database, name, readPermission, writePermissions string) error {
return self.updateDatabaseUserCommon(database, name, nil, nil, readPermission, writePermissions)
}
func (self *Client) DeleteDatabaseUser(database, name string) error {
url := self.getUrl("/db/" + database + "/users/" + name)
resp, err := self.del(url)
return responseToError(resp, err, true)
}
func (self *Client) GetDatabaseUserList(database string) ([]map[string]interface{}, error) {
url := self.getUrl("/db/" + database + "/users")
return self.listSomething(url)
}
func (self *Client) AlterDatabasePrivilege(database, name string, isAdmin bool, permissions ...string) error {
return self.updateDatabaseUserCommon(database, name, nil, &isAdmin, permissions...)
}
type TimePrecision string
const (
Second TimePrecision = "s"
Millisecond TimePrecision = "ms"
Microsecond TimePrecision = "u"
)
func (self *Client) WriteSeries(series []*Series) error {
return self.writeSeriesCommon(series, nil)
}
func (self *Client) WriteSeriesOverUDP(series []*Series) error {
if self.udpConn == nil {
return fmt.Errorf("UDP isn't enabled. Make sure to set config.IsUDP to true")
}
data, err := json.Marshal(series)
if err != nil {
return err
}
// because max of msg over upd is 2048 bytes
// https://github.com/influxdb/influxdb/blob/master/src/api/udp/api.go#L65
if len(data) >= UDPMaxMessageSize {
err = fmt.Errorf("data size over limit %v limit is %v", len(data), UDPMaxMessageSize)
fmt.Println(err)
return err
}
_, err = self.udpConn.Write(data)
if err != nil {
return err
}
return nil
}
func (self *Client) WriteSeriesWithTimePrecision(series []*Series, timePrecision TimePrecision) error {
return self.writeSeriesCommon(series, map[string]string{"time_precision": string(timePrecision)})
}
func (self *Client) writeSeriesCommon(series []*Series, options map[string]string) error {
data, err := json.Marshal(series)
if err != nil {
return err
}
url := self.getUrl("/db/" + self.database + "/series")
for name, value := range options {
url += fmt.Sprintf("&%s=%s", name, value)
}
var b *bytes.Buffer
if self.compression {
b = bytes.NewBuffer(nil)
w := gzip.NewWriter(b)
if _, err := w.Write(data); err != nil {
return err
}
w.Flush()
w.Close()
} else {
b = bytes.NewBuffer(data)
}
req, err := http.NewRequest("POST", url, b)
if err != nil {
return err
}
if self.compression {
req.Header.Set("Content-Encoding", "gzip")
}
resp, err := self.httpClient.Do(req)
return responseToError(resp, err, true)
}
func (self *Client) Query(query string, precision ...TimePrecision) ([]*Series, error) {
return self.queryCommon(query, false, precision...)
}
func (self *Client) QueryWithNumbers(query string, precision ...TimePrecision) ([]*Series, error) {
return self.queryCommon(query, true, precision...)
}
func (self *Client) queryCommon(query string, useNumber bool, precision ...TimePrecision) ([]*Series, error) {
escapedQuery := url.QueryEscape(query)
url := self.getUrl("/db/" + self.database + "/series")
if len(precision) > 0 {
url += "&time_precision=" + string(precision[0])
}
url += "&q=" + escapedQuery
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if !self.compression {
req.Header.Set("Accept-Encoding", "identity")
}
resp, err := self.httpClient.Do(req)
err = responseToError(resp, err, false)
if err != nil {
return nil, err
}
defer resp.Body.Close()
series := []*Series{}
decoder := json.NewDecoder(resp.Body)
if useNumber {
decoder.UseNumber()
}
err = decoder.Decode(&series)
if err != nil {
return nil, err
}
return series, nil
}
func (self *Client) Ping() error {
url := self.getUrl("/ping")
resp, err := self.httpClient.Get(url)
return responseToError(resp, err, true)
}
func (self *Client) AuthenticateDatabaseUser(database, username, password string) error {
url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/authenticate", database), username, password)
resp, err := self.httpClient.Get(url)
return responseToError(resp, err, true)
}
func (self *Client) AuthenticateClusterAdmin(username, password string) error {
url := self.getUrlWithUserAndPass("/cluster_admins/authenticate", username, password)
resp, err := self.httpClient.Get(url)
return responseToError(resp, err, true)
}
type LongTermShortTermShards struct {
// Long term shards, (doesn't get populated for version >= 0.8.0)
LongTerm []*Shard `json:"longTerm"`
// Short term shards, (doesn't get populated for version >= 0.8.0)
ShortTerm []*Shard `json:"shortTerm"`
// All shards in the system (Long + Short term shards for version < 0.8.0)
All []*Shard `json:"-"`
}
type Shard struct {
Id uint32 `json:"id"`
EndTime int64 `json:"endTime"`
StartTime int64 `json:"startTime"`
ServerIds []uint32 `json:"serverIds"`
SpaceName string `json:"spaceName"`
Database string `json:"database"`
}
type ShardSpaceCollection struct {
ShardSpaces []ShardSpace
}
func (self *Client) GetShards() (*LongTermShortTermShards, error) {
url := self.getUrlWithUserAndPass("/cluster/shards", self.username, self.password)
body, version, err := self.getWithVersion(url)
if err != nil {
return nil, err
}
return parseShards(body, version)
}
func isOrNewerThan(version, reference string) bool {
if version == "vdev" {
return true
}
majorMinor := strings.Split(version[1:], ".")[:2]
refMajorMinor := strings.Split(reference[1:], ".")[:2]
if majorMinor[0] > refMajorMinor[0] {
return true
}
if majorMinor[1] > refMajorMinor[1] {
return true
}
return majorMinor[1] == refMajorMinor[1]
}
func parseShards(body []byte, version string) (*LongTermShortTermShards, error) {
// strip the initial v in `v0.8.0` and split on the dots
if version != "" && isOrNewerThan(version, "v0.8") {
return parseNewShards(body)
}
shards := &LongTermShortTermShards{}
err := json.Unmarshal(body, &shards)
if err != nil {
return nil, err
}
shards.All = make([]*Shard, len(shards.LongTerm)+len(shards.ShortTerm))
copy(shards.All, shards.LongTerm)
copy(shards.All[len(shards.LongTerm):], shards.ShortTerm)
return shards, nil
}
func parseNewShards(body []byte) (*LongTermShortTermShards, error) {
shards := []*Shard{}
err := json.Unmarshal(body, &shards)
if err != nil {
return nil, err
}
return &LongTermShortTermShards{All: shards}, nil
}
// Added to InfluxDB in 0.8.0
func (self *Client) GetShardSpaces() ([]*ShardSpace, error) {
url := self.getUrlWithUserAndPass("/cluster/shard_spaces", self.username, self.password)
body, err := self.get(url)
if err != nil {
return nil, err
}
spaces := []*ShardSpace{}
err = json.Unmarshal(body, &spaces)
if err != nil {
return nil, err
}
return spaces, nil
}
// Added to InfluxDB in 0.8.0
func (self *Client) DropShardSpace(database, name string) error {
url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name), self.username, self.password)
_, err := self.del(url)
return err
}
// Added to InfluxDB in 0.8.0
func (self *Client) CreateShardSpace(database string, space *ShardSpace) error {
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s", database))
data, err := json.Marshal(space)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) DropShard(id uint32, serverIds []uint32) error {
url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shards/%d", id), self.username, self.password)
ids := map[string][]uint32{"serverIds": serverIds}
body, err := json.Marshal(ids)
if err != nil {
return err
}
_, err = self.delWithBody(url, bytes.NewBuffer(body))
return err
}
// Added to InfluxDB in 0.8.2
func (self *Client) UpdateShardSpace(database, name string, space *ShardSpace) error {
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name))
data, err := json.Marshal(space)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.1.3-beta.0

搜索帮助