1 Star 0 Fork 0

simon/smallnest

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
influxdb.go 4.73 KB
一键复制 编辑 原始数据 按行查看 历史
simon 提交于 2021-09-04 14:58 +08:00 . 旧版
package serverplugin
import (
"fmt"
"log"
uurl "net/url"
"time"
client "github.com/influxdata/influxdb1-client"
"github.com/rcrowley/go-metrics"
)
type reporter struct {
reg metrics.Registry
interval time.Duration
url uurl.URL
database string
username string
password string
tags map[string]string
client *client.Client
}
// InfluxDB starts a InfluxDB reporter which will post the metrics from the given registry at each d interval.
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password string) {
InfluxDBWithTags(r, d, url, database, username, password, nil)
}
// InfluxDBWithTags starts a InfluxDB reporter which will post the metrics from the given registry at each d interval with the specified tags
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password string, tags map[string]string) {
u, err := uurl.Parse(url)
if err != nil {
log.Printf("unable to parse InfluxDB url %s. err=%v", url, err)
return
}
rep := &reporter{
reg: r,
interval: d,
url: *u,
database: database,
username: username,
password: password,
tags: tags,
}
if err := rep.makeClient(); err != nil {
log.Printf("unable to make InfluxDB client. err=%v", err)
return
}
rep.run()
}
func (r *reporter) makeClient() (err error) {
r.client, err = client.NewClient(client.Config{
URL: r.url,
Username: r.username,
Password: r.password,
})
return
}
func (r *reporter) run() {
intervalTicker := time.NewTicker(r.interval)
defer intervalTicker.Stop()
pingTicker := time.NewTicker(time.Second * 5)
defer pingTicker.Stop()
for {
select {
case <-intervalTicker.C:
if err := r.send(); err != nil {
log.Printf("unable to send metrics to InfluxDB. err=%v", err)
}
case <-pingTicker.C:
_, _, err := r.client.Ping()
if err != nil {
log.Printf("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err)
if err = r.makeClient(); err != nil {
log.Printf("unable to make InfluxDB client. err=%v", err)
}
}
}
}
}
func (r *reporter) send() error {
var pts []client.Point
r.reg.Each(func(name string, i interface{}) {
now := time.Now()
switch metric := i.(type) {
case metrics.Counter:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s.count", name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Count(),
},
Time: now,
})
case metrics.Gauge:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s.gauge", name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Value(),
},
Time: now,
})
case metrics.GaugeFloat64:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s.gauge", name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Value(),
},
Time: now,
})
case metrics.Histogram:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s.histogram", name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
},
Time: now,
})
case metrics.Meter:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s.meter", name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
},
Time: now,
})
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s.timer", name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
},
Time: now,
})
}
})
bps := client.BatchPoints{
Points: pts,
Database: r.database,
}
_, err := r.client.Write(bps)
return err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/simon_git_code/smallnest.git
git@gitee.com:simon_git_code/smallnest.git
simon_git_code
smallnest
smallnest
e483c3e07d35

搜索帮助