1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
client.go 11.65 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
package metrics
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
)
// TODO Instrumentation? To get statistics?
// More detailed error
type HawkularClientError struct {
msg string
Code int
}
func (self *HawkularClientError) Error() string {
return fmt.Sprintf("Hawkular returned status code %d, error message: %s", self.Code, self.msg)
}
// Client creation and instance config
const (
base_url string = "hawkular/metrics"
timeout time.Duration = time.Duration(30 * time.Second)
)
type Parameters struct {
Tenant string // Technically optional, but requires setting Tenant() option everytime
Url string
TLSConfig *tls.Config
Token string
}
type Client struct {
Tenant string
url *url.URL
client *http.Client
Token string
}
type HawkularClient interface {
Send(*http.Request) (*http.Response, error)
}
// Modifiers
type Modifier func(*http.Request) error
// Override function to replace the Tenant (defaults to Client default)
func Tenant(tenant string) Modifier {
return func(r *http.Request) error {
r.Header.Set("Hawkular-Tenant", tenant)
return nil
}
}
// Add payload to the request
func Data(data interface{}) Modifier {
return func(r *http.Request) error {
jsonb, err := json.Marshal(data)
if err != nil {
return err
}
b := bytes.NewBuffer(jsonb)
rc := ioutil.NopCloser(b)
r.Body = rc
// fmt.Printf("Sending: %s\n", string(jsonb))
if b != nil {
r.ContentLength = int64(b.Len())
}
return nil
}
}
func (self *Client) Url(method string, e ...Endpoint) Modifier {
// TODO Create composite URLs? Add().Add().. etc? Easier to modify on the fly..
return func(r *http.Request) error {
u := self.createUrl(e...)
r.URL = u
r.Method = method
return nil
}
}
// Filters for querying
type Filter func(r *http.Request)
func Filters(f ...Filter) Modifier {
return func(r *http.Request) error {
for _, filter := range f {
filter(r)
}
return nil // Or should filter return err?
}
}
// Add query parameters
func Param(k string, v string) Filter {
return func(r *http.Request) {
q := r.URL.Query()
q.Set(k, v)
r.URL.RawQuery = q.Encode()
}
}
func TypeFilter(t MetricType) Filter {
return Param("type", t.shortForm())
}
func TagsFilter(t map[string]string) Filter {
j := tagsEncoder(t)
return Param("tags", j)
}
// Requires HWKMETRICS-233
func IdFilter(regexp string) Filter {
return Param("id", regexp)
}
func StartTimeFilter(startTime time.Time) Filter {
return Param("start", strconv.Itoa(int(startTime.Unix())))
}
func EndTimeFilter(endTime time.Time) Filter {
return Param("end", strconv.Itoa(int(endTime.Unix())))
}
func BucketsFilter(buckets int) Filter {
return Param("buckets", strconv.Itoa(buckets))
}
func PercentilesFilter(percentiles []float64) Filter {
s := make([]string, 0, len(percentiles))
for _, v := range percentiles {
s = append(s, fmt.Sprintf("%v", v))
}
j := strings.Join(s, ",")
return Param("percentiles", j)
}
// The SEND method..
func (self *Client) createRequest() *http.Request {
req := &http.Request{
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
Host: self.url.Host,
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Hawkular-Tenant", self.Tenant)
if len(self.Token) > 0 {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", self.Token))
}
return req
}
func (self *Client) Send(o ...Modifier) (*http.Response, error) {
// Initialize
r := self.createRequest()
// Run all the modifiers
for _, f := range o {
err := f(r)
if err != nil {
return nil, err
}
}
return self.client.Do(r)
}
// Commands
func prepend(slice []Modifier, a ...Modifier) []Modifier {
p := make([]Modifier, 0, len(slice)+len(a))
p = append(p, a...)
p = append(p, slice...)
return p
}
// Create new Definition
func (self *Client) Create(md MetricDefinition, o ...Modifier) (bool, error) {
// Keep the order, add custom prepend
o = prepend(o, self.Url("POST", TypeEndpoint(md.Type)), Data(md))
r, err := self.Send(o...)
if err != nil {
return false, err
}
defer r.Body.Close()
if r.StatusCode > 399 {
err = self.parseErrorResponse(r)
if err, ok := err.(*HawkularClientError); ok {
if err.Code != http.StatusConflict {
return false, err
} else {
return false, nil
}
}
return false, err
}
return true, nil
}
// Fetch definitions
func (self *Client) Definitions(o ...Modifier) ([]*MetricDefinition, error) {
o = prepend(o, self.Url("GET", TypeEndpoint(Generic)))
r, err := self.Send(o...)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode == http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
md := []*MetricDefinition{}
if b != nil {
if err = json.Unmarshal(b, &md); err != nil {
return nil, err
}
}
return md, err
} else if r.StatusCode > 399 {
return nil, self.parseErrorResponse(r)
}
return nil, nil
}
// Return a single definition
func (self *Client) Definition(t MetricType, id string, o ...Modifier) (*MetricDefinition, error) {
o = prepend(o, self.Url("GET", TypeEndpoint(t), SingleMetricEndpoint(id)))
r, err := self.Send(o...)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode == http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
md := &MetricDefinition{}
if b != nil {
if err = json.Unmarshal(b, md); err != nil {
return nil, err
}
}
return md, err
} else if r.StatusCode > 399 {
return nil, self.parseErrorResponse(r)
}
return nil, nil
}
// Update tags
func (self *Client) UpdateTags(t MetricType, id string, tags map[string]string, o ...Modifier) error {
o = prepend(o, self.Url("PUT", TypeEndpoint(t), SingleMetricEndpoint(id), TagEndpoint()), Data(tags))
r, err := self.Send(o...)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode > 399 {
return self.parseErrorResponse(r)
}
return nil
}
// Delete given tags from the definition
func (self *Client) DeleteTags(t MetricType, id string, tags map[string]string, o ...Modifier) error {
o = prepend(o, self.Url("DELETE", TypeEndpoint(t), SingleMetricEndpoint(id), TagEndpoint(), TagsEndpoint(tags)))
r, err := self.Send(o...)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode > 399 {
return self.parseErrorResponse(r)
}
return nil
}
// Fetch metric definition tags
func (self *Client) Tags(t MetricType, id string, o ...Modifier) (map[string]string, error) {
o = prepend(o, self.Url("GET", TypeEndpoint(t), SingleMetricEndpoint(id), TagEndpoint()))
r, err := self.Send(o...)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode == http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
tags := make(map[string]string)
if b != nil {
if err = json.Unmarshal(b, &tags); err != nil {
return nil, err
}
}
return tags, nil
} else if r.StatusCode > 399 {
return nil, self.parseErrorResponse(r)
}
return nil, nil
}
// Write datapoints to the server
func (self *Client) Write(metrics []MetricHeader, o ...Modifier) error {
if len(metrics) > 0 {
mHs := make(map[MetricType][]MetricHeader)
for _, m := range metrics {
if _, found := mHs[m.Type]; !found {
mHs[m.Type] = make([]MetricHeader, 0, 1)
}
mHs[m.Type] = append(mHs[m.Type], m)
}
wg := &sync.WaitGroup{}
errorsChan := make(chan error, len(mHs))
for k, v := range mHs {
wg.Add(1)
go func(k MetricType, v []MetricHeader) {
defer wg.Done()
// Should be sorted and splitted by type & tenant..
on := o
on = prepend(on, self.Url("POST", TypeEndpoint(k), DataEndpoint()), Data(v))
r, err := self.Send(on...)
if err != nil {
errorsChan <- err
return
}
defer r.Body.Close()
if r.StatusCode > 399 {
errorsChan <- self.parseErrorResponse(r)
}
}(k, v)
}
wg.Wait()
select {
case err, ok := <-errorsChan:
if ok {
return err
}
// If channel is closed, we're done
default:
// Nothing to do
}
}
return nil
}
// Read data from the server
func (self *Client) ReadMetric(t MetricType, id string, o ...Modifier) ([]*Datapoint, error) {
o = prepend(o, self.Url("GET", TypeEndpoint(t), SingleMetricEndpoint(id), DataEndpoint()))
r, err := self.Send(o...)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode == http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
// Check for GaugeBucketpoint and so on for the rest.. uh
dp := []*Datapoint{}
if b != nil {
if err = json.Unmarshal(b, &dp); err != nil {
return nil, err
}
}
return dp, nil
} else if r.StatusCode > 399 {
return nil, self.parseErrorResponse(r)
}
return nil, nil
}
// TODO ReadMetrics should be equal also, to read new tagsFilter aggregation..
func (self *Client) ReadBuckets(t MetricType, o ...Modifier) ([]*Bucketpoint, error) {
o = prepend(o, self.Url("GET", TypeEndpoint(t), DataEndpoint()))
r, err := self.Send(o...)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode == http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
// Check for GaugeBucketpoint and so on for the rest.. uh
bp := []*Bucketpoint{}
if b != nil {
if err = json.Unmarshal(b, &bp); err != nil {
return nil, err
}
}
return bp, nil
} else if r.StatusCode > 399 {
return nil, self.parseErrorResponse(r)
}
return nil, nil
}
// Initialization
func NewHawkularClient(p Parameters) (*Client, error) {
uri, err := url.Parse(p.Url)
if err != nil {
return nil, err
}
if uri.Path == "" {
uri.Path = base_url
}
u := &url.URL{
Host: uri.Host,
Path: uri.Path,
Scheme: uri.Scheme,
Opaque: fmt.Sprintf("//%s/%s", uri.Host, uri.Path),
}
c := &http.Client{
Timeout: timeout,
}
if p.TLSConfig != nil {
transport := &http.Transport{TLSClientConfig: p.TLSConfig}
c.Transport = transport
}
return &Client{
url: u,
Tenant: p.Tenant,
Token: p.Token,
client: c,
}, nil
}
// HTTP Helper functions
func cleanId(id string) string {
return url.QueryEscape(id)
}
func (self *Client) parseErrorResponse(resp *http.Response) error {
// Parse error messages here correctly..
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return &HawkularClientError{Code: resp.StatusCode,
msg: fmt.Sprintf("Reply could not be read: %s", err.Error()),
}
}
details := &HawkularError{}
err = json.Unmarshal(reply, details)
if err != nil {
return &HawkularClientError{Code: resp.StatusCode,
msg: fmt.Sprintf("Reply could not be parsed: %s", err.Error()),
}
}
return &HawkularClientError{Code: resp.StatusCode,
msg: details.ErrorMsg,
}
}
// URL functions (...)
type Endpoint func(u *url.URL)
func (self *Client) createUrl(e ...Endpoint) *url.URL {
mu := *self.url
for _, f := range e {
f(&mu)
}
return &mu
}
func TypeEndpoint(t MetricType) Endpoint {
return func(u *url.URL) {
addToUrl(u, t.String())
}
}
func SingleMetricEndpoint(id string) Endpoint {
return func(u *url.URL) {
addToUrl(u, url.QueryEscape(id))
}
}
func TagEndpoint() Endpoint {
return func(u *url.URL) {
addToUrl(u, "tags")
}
}
func TagsEndpoint(tags map[string]string) Endpoint {
return func(u *url.URL) {
addToUrl(u, tagsEncoder(tags))
}
}
func DataEndpoint() Endpoint {
return func(u *url.URL) {
addToUrl(u, "data")
}
}
func addToUrl(u *url.URL, s string) *url.URL {
u.Opaque = fmt.Sprintf("%s/%s", u.Opaque, s)
return u
}
func tagsEncoder(t map[string]string) string {
tags := make([]string, 0, len(t))
for k, v := range t {
tags = append(tags, fmt.Sprintf("%s:%s", k, v))
}
j := strings.Join(tags, ",")
return j
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.2.8-beta.0

搜索帮助

Cb406eda 1850385 E526c682 1850385