1 Star 0 Fork 0


加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
influxdb.go 10.55 KB
一键复制 编辑 原始数据 按行查看 历史
Victor Marmol 提交于 2015-03-09 17:53 . Updating cAdvisor dependency.
// Copyright 2014 Google Inc. All Rights Reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package influxdb
import (
info "github.com/google/cadvisor/info/v1"
influxdb "github.com/influxdb/influxdb/client"
type influxdbStorage struct {
client *influxdb.Client
machineName string
tableName string
bufferDuration time.Duration
lastWrite time.Time
series []*influxdb.Series
lock sync.Mutex
readyToFlush func() bool
const (
colTimestamp string = "time"
colMachineName string = "machine"
colContainerName string = "container_name"
colCpuCumulativeUsage string = "cpu_cumulative_usage"
// Memory Usage
colMemoryUsage string = "memory_usage"
// Working set size
colMemoryWorkingSet string = "memory_working_set"
// Cumulative count of bytes received.
colRxBytes string = "rx_bytes"
// Cumulative count of receive errors encountered.
colRxErrors string = "rx_errors"
// Cumulative count of bytes transmitted.
colTxBytes string = "tx_bytes"
// Cumulative count of transmit errors encountered.
colTxErrors string = "tx_errors"
// Filesystem device.
colFsDevice = "fs_device"
// Filesystem limit.
colFsLimit = "fs_limit"
// Filesystem usage.
colFsUsage = "fs_usage"
func (self *influxdbStorage) getSeriesDefaultValues(
ref info.ContainerReference,
stats *info.ContainerStats,
columns *[]string,
values *[]interface{}) {
// Timestamp
*columns = append(*columns, colTimestamp)
*values = append(*values, stats.Timestamp.UnixNano()/1E3)
// Machine name
*columns = append(*columns, colMachineName)
*values = append(*values, self.machineName)
// Container name
*columns = append(*columns, colContainerName)
if len(ref.Aliases) > 0 {
*values = append(*values, ref.Aliases[0])
} else {
*values = append(*values, ref.Name)
// In order to maintain a fixed column format, we add a new series for each filesystem partition.
func (self *influxdbStorage) containerFilesystemStatsToSeries(
ref info.ContainerReference,
stats *info.ContainerStats) (series []*influxdb.Series) {
if len(stats.Filesystem) == 0 {
return series
for _, fsStat := range stats.Filesystem {
columns := make([]string, 0)
values := make([]interface{}, 0)
self.getSeriesDefaultValues(ref, stats, &columns, &values)
columns = append(columns, colFsDevice)
values = append(values, fsStat.Device)
columns = append(columns, colFsLimit)
values = append(values, fsStat.Limit)
columns = append(columns, colFsUsage)
values = append(values, fsStat.Usage)
series = append(series, self.newSeries(columns, values))
return series
func (self *influxdbStorage) containerStatsToValues(
ref info.ContainerReference,
stats *info.ContainerStats,
) (columns []string, values []interface{}) {
self.getSeriesDefaultValues(ref, stats, &columns, &values)
// Cumulative Cpu Usage
columns = append(columns, colCpuCumulativeUsage)
values = append(values, stats.Cpu.Usage.Total)
// Memory Usage
columns = append(columns, colMemoryUsage)
values = append(values, stats.Memory.Usage)
// Working set size
columns = append(columns, colMemoryWorkingSet)
values = append(values, stats.Memory.WorkingSet)
// Network stats.
columns = append(columns, colRxBytes)
values = append(values, stats.Network.RxBytes)
columns = append(columns, colRxErrors)
values = append(values, stats.Network.RxErrors)
columns = append(columns, colTxBytes)
values = append(values, stats.Network.TxBytes)
columns = append(columns, colTxErrors)
values = append(values, stats.Network.TxErrors)
return columns, values
func convertToUint64(v interface{}) (uint64, error) {
if v == nil {
return 0, nil
switch x := v.(type) {
case uint64:
return x, nil
case int:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
return uint64(x), nil
case int32:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
return uint64(x), nil
case int64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
return uint64(x), nil
case float64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
return uint64(x), nil
case uint32:
return uint64(x), nil
return 0, fmt.Errorf("unknown type")
func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
stats := &info.ContainerStats{
Filesystem: make([]info.FsStats, 0),
var err error
for i, col := range columns {
v := values[i]
switch {
case col == colTimestamp:
if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() {
stats.Timestamp = time.Unix(int64(f64sec)/1E3, (int64(f64sec)%1E3)*1E6)
case col == colMachineName:
if m, ok := v.(string); ok {
if m != self.machineName {
return nil, fmt.Errorf("different machine")
} else {
return nil, fmt.Errorf("machine name field is not a string: %v", v)
// Cumulative Cpu Usage
case col == colCpuCumulativeUsage:
stats.Cpu.Usage.Total, err = convertToUint64(v)
// Memory Usage
case col == colMemoryUsage:
stats.Memory.Usage, err = convertToUint64(v)
// Working set size
case col == colMemoryWorkingSet:
stats.Memory.WorkingSet, err = convertToUint64(v)
case col == colRxBytes:
stats.Network.RxBytes, err = convertToUint64(v)
case col == colRxErrors:
stats.Network.RxErrors, err = convertToUint64(v)
case col == colTxBytes:
stats.Network.TxBytes, err = convertToUint64(v)
case col == colTxErrors:
stats.Network.TxErrors, err = convertToUint64(v)
case col == colFsDevice:
device, ok := v.(string)
if !ok {
return nil, fmt.Errorf("filesystem name field is not a string: %+v", v)
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device})
} else {
stats.Filesystem[0].Device = device
case col == colFsLimit:
limit, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem limit field %+v invalid: %s", v, err)
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Limit: limit})
} else {
stats.Filesystem[0].Limit = limit
case col == colFsUsage:
usage, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem usage field %+v invalid: %s", v, err)
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Usage: usage})
} else {
stats.Filesystem[0].Usage = usage
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
return stats, nil
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
self.readyToFlush = readyToFlush
func (self *influxdbStorage) defaultReadyToFlush() bool {
return time.Since(self.lastWrite) >= self.bufferDuration
func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
if stats == nil {
return nil
var seriesToFlush []*influxdb.Series
func() {
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
defer self.lock.Unlock()
self.series = append(self.series, self.newSeries(self.containerStatsToValues(ref, stats)))
self.series = append(self.series, self.containerFilesystemStatsToSeries(ref, stats)...)
if self.readyToFlush() {
seriesToFlush = self.series
self.series = make([]*influxdb.Series, 0)
self.lastWrite = time.Now()
if len(seriesToFlush) > 0 {
err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Microsecond)
if err != nil {
return fmt.Errorf("failed to write stats to influxDb - %s", err)
return nil
func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
if numStats == 0 {
return nil, nil
// TODO(dengnan): select only columns that we need
// TODO(dengnan): escape names
query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName)
if numStats > 0 {
query = fmt.Sprintf("%v limit %v", query, numStats)
series, err := self.client.Query(query)
if err != nil {
return nil, err
statsList := make([]*info.ContainerStats, 0, len(series))
// By default, influxDB returns data in time descending order.
// RecentStats() requires stats in time increasing order,
// so we need to go through from the last one to the first one.
for i := len(series) - 1; i >= 0; i-- {
s := series[i]
for j := len(s.Points) - 1; j >= 0; j-- {
values := s.Points[j]
stats, err := self.valuesToContainerStats(s.Columns, values)
if err != nil {
return nil, err
if stats == nil {
statsList = append(statsList, stats)
return statsList, nil
func (self *influxdbStorage) Close() error {
self.client = nil
return nil
// Returns a new influxdb series.
func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series {
out := &influxdb.Series{
Name: self.tableName,
Columns: columns,
// There's only one point for each stats
Points: make([][]interface{}, 1),
out.Points[0] = points
return out
// machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on.
// influxdbHost: The host which runs influxdb.
func New(machineName,
influxdbHost string,
isSecure bool,
bufferDuration time.Duration,
) (*influxdbStorage, error) {
config := &influxdb.ClientConfig{
Host: influxdbHost,
Username: username,
Password: password,
Database: database,
IsSecure: isSecure,
client, err := influxdb.NewClient(config)
if err != nil {
return nil, err
// TODO(monnand): With go 1.3, we cannot compress data now.
ret := &influxdbStorage{
client: client,
machineName: machineName,
tableName: tablename,
bufferDuration: bufferDuration,
lastWrite: time.Now(),
series: make([]*influxdb.Series, 0),
ret.readyToFlush = ret.defaultReadyToFlush
return ret, nil
马建仓 AI 助手


Cb406eda 1850385 E526c682 1850385