From 50292964218ecb250cf047a6d6342c6d9bd8308f Mon Sep 17 00:00:00 2001 From: travelliu Date: Wed, 28 Feb 2024 15:51:12 +0800 Subject: [PATCH 1/4] fix: query timeout issue --- pkg/exporter/server_collect.go | 34 +++++++++++++++++++------- pkg/exporter/server_test.go | 44 +++++++++++++++++++++++++++------- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/pkg/exporter/server_collect.go b/pkg/exporter/server_collect.go index e374c23..2af2520 100644 --- a/pkg/exporter/server_collect.go +++ b/pkg/exporter/server_collect.go @@ -14,6 +14,24 @@ import ( "unicode/utf8" ) +func (s *Server) execSQL(ctx context.Context, sqlText string) (*sql.Rows, error) { + ch := make(chan struct{}, 0) + var ( + rows *sql.Rows + err error + ) + go func() { + rows, err = s.db.Query(sqlText) + ch <- struct{}{} + }() + select { + case <-ch: + return rows, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Metric, []error, error) { // 根据版本获取查询sql query := queryInstance.GetQuerySQL(s.lastMapVersion, s.primary) @@ -21,7 +39,7 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met // Return success (no pertinent data) return []prometheus.Metric{}, []error{}, nil } - + // Don't fail on a bad scrape of one metric var ( rows *sql.Rows @@ -38,19 +56,19 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met defer cancel() } log.Debugf("Collect Metric [%s] executing sql %s", queryInstance.Name, query.SQL) - rows, err = s.db.QueryContext(ctx, query.SQL) + rows, err = s.execSQL(ctx, query.SQL) end := time.Now().Sub(begin).Milliseconds() - + log.Debugf("Collect Metric [%s] executing using time %vms", queryInstance.Name, end) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { log.Errorf("Collect Metric [%s] executing timeout %v", queryInstance.Name, query.TimeoutDuration()) err = fmt.Errorf("timeout %v %s", query.TimeoutDuration(), err) } else { - log.Errorf("Collect Metric [%s] QueryContext err %s", queryInstance.Name, err) + log.Errorf("Collect Metric [%s] executing err %s", queryInstance.Name, err) } return []prometheus.Metric{}, []error{}, - fmt.Errorf("Collect Metric [%s] QueryContext on database %q err %s ", metricName, s, err) + fmt.Errorf("Collect Metric [%s] executing on database %q err %s ", metricName, s, err) } defer rows.Close() var columnNames []string @@ -59,7 +77,7 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met log.Errorf("Collect Metric [%s] executing Columns err %s", queryInstance.Name, err) return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", metricName, err)) } - + // Make a lookup map for the column indices var columnIdx = make(map[string]int, len(columnNames)) for i, n := range columnNames { @@ -82,12 +100,12 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met list = append(list, columnData) } if err = rows.Err(); err != nil { - log.Debugf("Collect Metric [%s] rows.Err() %s", metricName, err) + log.Debugf("Collect Metric [%s] fetch data rows.Err() %s", metricName, err) nonfatalErrors = append(nonfatalErrors, err) } end = time.Now().Sub(begin).Milliseconds() log.Debugf("Collect Metric [%s] executing total time %vms", queryInstance.Name, end) - + metrics := make([]prometheus.Metric, 0) for i := range list { metric, errs := s.procRows(queryInstance, columnNames, columnIdx, list[i]) diff --git a/pkg/exporter/server_test.go b/pkg/exporter/server_test.go index cda5f0d..d1e785d 100644 --- a/pkg/exporter/server_test.go +++ b/pkg/exporter/server_test.go @@ -192,7 +192,7 @@ func Test_Server(t *testing.T) { metricName = "pg_lock" queryInstance = defaultMonList[metricName] ) - + _ = queryInstance.Check() t.Run("ServerOpt", func(t *testing.T) { s := &Server{ @@ -204,7 +204,7 @@ func Test_Server(t *testing.T) { assert.Equal(t, prometheus.Labels{ "server": "localhost:5432", }, s.labels) - + ServerWithNamespace("a1")(s) assert.Equal(t, "a1", s.namespace) ServerWithDisableSettingsMetrics(false)(s) @@ -608,12 +608,12 @@ postgres,AccessExclusiveLock,0`)) }, } err := s.queryMetric(ch, q) - + assert.NoError(t, err) - + // cache 过期 time.Sleep(3 * time.Second) - + mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) _ = q.Check() @@ -674,11 +674,11 @@ postgres,AccessExclusiveLock,0`)) t.Error(err) } s.db = db - + queryInstanceMap := map[string]*QueryInstance{ "pg_database": pg_database, } - + mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) errs := s.queryMetrics(ch, queryInstanceMap) @@ -704,7 +704,7 @@ postgres,AccessExclusiveLock,0`)) } else { s.SetDBInfoMap(v) } - + metrics, errs, err := s.doCollectMetric(pgActiveSlowsql) assert.Error(t, err) assert.ElementsMatch(t, []error{}, errs) @@ -713,6 +713,34 @@ postgres,AccessExclusiveLock,0`)) } assert.ElementsMatch(t, []prometheus.Metric{}, metrics) }) + t.Run("timeout", func(t *testing.T) { + db, mock, err = sqlmock.New() + if err != nil { + t.Error(err) + } + s.db = db + mock.ExpectQuery("SELECT").WillDelayFor(2 * time.Second).WillReturnRows( + sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) + metric := &QueryInstance{ + Name: "pg_database", + Desc: "OpenGauss Database size", + Queries: []*Query{ + { + SQL: `SELECT datname,size_bytes from dual`, + Version: ">=0.0.0", + TTL: 10, + Timeout: 1.0, + }, + }, + Metrics: []*Column{ + {Name: "datname", Usage: LABEL, Desc: "Name of this database"}, + {Name: "size_bytes", Usage: GAUGE, Desc: "Disk space used by the database"}, + }, + } + metric.Check() + _, _, err := s.doCollectMetric(metric) + assert.Error(t, err) + }) } func Test_cachedMetrics(t *testing.T) { -- Gitee From dc5e3201d53c7f7f8d93ab176b8b9f28de026372 Mon Sep 17 00:00:00 2001 From: travelliu Date: Thu, 29 Feb 2024 14:14:37 +0800 Subject: [PATCH 2/4] docs: change log info --- pkg/exporter/server_collect.go | 26 +++++++++++++------------- pkg/exporter/server_query.go | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/exporter/server_collect.go b/pkg/exporter/server_collect.go index 2af2520..9033b68 100644 --- a/pkg/exporter/server_collect.go +++ b/pkg/exporter/server_collect.go @@ -39,7 +39,7 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met // Return success (no pertinent data) return []prometheus.Metric{}, []error{}, nil } - + // Don't fail on a bad scrape of one metric var ( rows *sql.Rows @@ -51,33 +51,33 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met // TODO disable timeout if query.Timeout > 0 { // if timeout is provided, use context var cancel context.CancelFunc - log.Debugf("Collect Metric [%s] executing with time limit: %v", query.Name, query.TimeoutDuration()) + log.Debugf("Collect Metric [%s] query with time limit: %v", query.Name, query.TimeoutDuration()) ctx, cancel = context.WithTimeout(context.Background(), query.TimeoutDuration()) defer cancel() } - log.Debugf("Collect Metric [%s] executing sql %s", queryInstance.Name, query.SQL) + log.Debugf("Collect Metric [%s] query sql %s", queryInstance.Name, query.SQL) rows, err = s.execSQL(ctx, query.SQL) end := time.Now().Sub(begin).Milliseconds() - - log.Debugf("Collect Metric [%s] executing using time %vms", queryInstance.Name, end) + + log.Debugf("Collect Metric [%s] query using time %vms", queryInstance.Name, end) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { - log.Errorf("Collect Metric [%s] executing timeout %v", queryInstance.Name, query.TimeoutDuration()) + log.Errorf("Collect Metric [%s] query timeout %v", queryInstance.Name, query.TimeoutDuration()) err = fmt.Errorf("timeout %v %s", query.TimeoutDuration(), err) } else { - log.Errorf("Collect Metric [%s] executing err %s", queryInstance.Name, err) + log.Errorf("Collect Metric [%s] query err %s", queryInstance.Name, err) } return []prometheus.Metric{}, []error{}, - fmt.Errorf("Collect Metric [%s] executing on database %q err %s ", metricName, s, err) + fmt.Errorf("Collect Metric [%s] query on database %q err %s ", metricName, s, err) } defer rows.Close() var columnNames []string columnNames, err = rows.Columns() if err != nil { - log.Errorf("Collect Metric [%s] executing Columns err %s", queryInstance.Name, err) + log.Errorf("Collect Metric [%s] fetch Columns err %s", queryInstance.Name, err) return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", metricName, err)) } - + // Make a lookup map for the column indices var columnIdx = make(map[string]int, len(columnNames)) for i, n := range columnNames { @@ -93,7 +93,7 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met } err = rows.Scan(scanArgs...) if err != nil { - log.Errorf("Collect Metric [%s] executing rows.Scan err %s", queryInstance.Name, err) + log.Errorf("Collect Metric [%s] fetch rows.Scan err %s", queryInstance.Name, err) nonfatalErrors = append(nonfatalErrors, err) break } @@ -104,8 +104,8 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met nonfatalErrors = append(nonfatalErrors, err) } end = time.Now().Sub(begin).Milliseconds() - log.Debugf("Collect Metric [%s] executing total time %vms", queryInstance.Name, end) - + log.Debugf("Collect Metric [%s] fetch total time %vms", queryInstance.Name, end) + metrics := make([]prometheus.Metric, 0) for i := range list { metric, errs := s.procRows(queryInstance, columnNames, columnIdx, list[i]) diff --git a/pkg/exporter/server_query.go b/pkg/exporter/server_query.go index d2998c2..cf01fc0 100644 --- a/pkg/exporter/server_query.go +++ b/pkg/exporter/server_query.go @@ -90,7 +90,7 @@ func (s *Server) queryMetric(ch chan<- prometheus.Metric, queryInstance *QueryIn querySQL := queryInstance.GetQuerySQL(s.lastMapVersion, s.primary) if querySQL == nil { - log.Errorf("Collect Metric %s not define querySQL for version %s on %s database ", metricName, s.lastMapVersion.String(), s.DBRole()) + log.Warnf("Collect Metric %s not define querySQL for version %s on %s database ", metricName, s.lastMapVersion.String(), s.DBRole()) return nil } if strings.EqualFold(querySQL.Status, statusDisable) { -- Gitee From 630a9665d5759acd44ef330e9f284906d3e14d02 Mon Sep 17 00:00:00 2001 From: travelliu Date: Thu, 29 Feb 2024 17:02:53 +0800 Subject: [PATCH 3/4] fix: query hang issue --- pkg/exporter/server.go | 14 ++++++++------ pkg/exporter/server_collect.go | 23 ++++++++++++----------- pkg/exporter/server_query.go | 6 +++--- pkg/exporter/servers.go | 12 +++++++----- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/pkg/exporter/server.go b/pkg/exporter/server.go index 57278bc..f01bb46 100644 --- a/pkg/exporter/server.go +++ b/pkg/exporter/server.go @@ -104,6 +104,7 @@ type Server struct { queryScrapeDuration map[string]float64 // internal query metrics: time spend on executing clientEncoding string dbInfoMap map[string]*DBInfo + dbName string } type DBInfo struct { @@ -269,12 +270,12 @@ func (s *Server) getBaseInfo() error { return err } var ( - versionString, clientEncoding string - b bool + versionString, clientEncoding, currentDatabase string + b bool ) - sqlText := "SELECT version(),current_setting('client_encoding'),pg_is_in_recovery()" + sqlText := "SELECT version(),current_setting('client_encoding'),pg_is_in_recovery(),current_database()" logrus.Debugf(sqlText) - err := s.db.QueryRow(sqlText).Scan(&versionString, &clientEncoding, &b) + err := s.db.QueryRow(sqlText).Scan(&versionString, &clientEncoding, &b, ¤tDatabase) if err != nil { return err } @@ -285,6 +286,7 @@ func (s *Server) getBaseInfo() error { return fmt.Errorf("Error parsing version string err %s ", err) } s.lastMapVersion = semanticVersion + s.dbName = currentDatabase return nil } @@ -307,8 +309,8 @@ func (s *Server) ConnectDatabase() error { return err } s.db.SetConnMaxIdleTime(120 * time.Second) - // s.db.SetMaxIdleConns(s.parallel) - s.db.SetMaxOpenConns(s.parallel) + s.db.SetMaxIdleConns(s.parallel) + // s.db.SetMaxOpenConns(s.parallel) s.UP = true return nil } diff --git a/pkg/exporter/server_collect.go b/pkg/exporter/server_collect.go index 9033b68..747ac48 100644 --- a/pkg/exporter/server_collect.go +++ b/pkg/exporter/server_collect.go @@ -51,31 +51,32 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met // TODO disable timeout if query.Timeout > 0 { // if timeout is provided, use context var cancel context.CancelFunc - log.Debugf("Collect Metric [%s] query with time limit: %v", query.Name, query.TimeoutDuration()) + log.Debugf("Collect Metric [%s] on %s query with time limit: %v", query.Name, s.dbName, query.TimeoutDuration()) ctx, cancel = context.WithTimeout(context.Background(), query.TimeoutDuration()) defer cancel() } - log.Debugf("Collect Metric [%s] query sql %s", queryInstance.Name, query.SQL) + log.Debugf("Collect Metric [%s] on %s query sql %s ", queryInstance.Name, s.dbName, query.SQL) rows, err = s.execSQL(ctx, query.SQL) end := time.Now().Sub(begin).Milliseconds() - log.Debugf("Collect Metric [%s] query using time %vms", queryInstance.Name, end) + log.Debugf("Collect Metric [%s] on %s query using time %vms", queryInstance.Name, s.dbName, end) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { - log.Errorf("Collect Metric [%s] query timeout %v", queryInstance.Name, query.TimeoutDuration()) + log.Errorf("Collect Metric [%s] on %s query timeout %v", queryInstance.Name, s.dbName, query.TimeoutDuration()) err = fmt.Errorf("timeout %v %s", query.TimeoutDuration(), err) } else { - log.Errorf("Collect Metric [%s] query err %s", queryInstance.Name, err) + log.Errorf("Collect Metric [%s] on %s query err %s", queryInstance.Name, s.dbName, err) } return []prometheus.Metric{}, []error{}, - fmt.Errorf("Collect Metric [%s] query on database %q err %s ", metricName, s, err) + fmt.Errorf("Collect Metric [%s] on %s query err %s ", metricName, s.dbName, err) } defer rows.Close() var columnNames []string columnNames, err = rows.Columns() if err != nil { - log.Errorf("Collect Metric [%s] fetch Columns err %s", queryInstance.Name, err) - return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", metricName, err)) + err := fmt.Errorf("collect Metric [%s] on %s fetch Columns err %s", queryInstance.Name, s.dbName, err) + log.Error(err) + return []prometheus.Metric{}, []error{}, err } // Make a lookup map for the column indices @@ -93,18 +94,18 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met } err = rows.Scan(scanArgs...) if err != nil { - log.Errorf("Collect Metric [%s] fetch rows.Scan err %s", queryInstance.Name, err) + log.Errorf("Collect Metric [%s] on %s fetch rows.Scan err %s", queryInstance.Name, s.dbName, err) nonfatalErrors = append(nonfatalErrors, err) break } list = append(list, columnData) } if err = rows.Err(); err != nil { - log.Debugf("Collect Metric [%s] fetch data rows.Err() %s", metricName, err) + log.Debugf("Collect Metric [%s] on %s fetch data rows.Err() %s", metricName, s.dbName, err) nonfatalErrors = append(nonfatalErrors, err) } end = time.Now().Sub(begin).Milliseconds() - log.Debugf("Collect Metric [%s] fetch total time %vms", queryInstance.Name, end) + log.Debugf("Collect Metric [%s] on %s fetch total time %vms", queryInstance.Name, s.dbName, end) metrics := make([]prometheus.Metric, 0) for i := range list { diff --git a/pkg/exporter/server_query.go b/pkg/exporter/server_query.go index cf01fc0..bc3d78f 100644 --- a/pkg/exporter/server_query.go +++ b/pkg/exporter/server_query.go @@ -123,20 +123,20 @@ func (s *Server) queryMetric(ch chan<- prometheus.Metric, queryInstance *QueryIn if scrapeMetric { metrics, nonFatalErrors, err = s.doCollectMetric(queryInstance) } else { - log.Debugf("Collect Metric [%s] use cache", metricName) + log.Debugf("Collect Metric [%s] on %s use cache", metricName, s.dbName) metrics, nonFatalErrors = cachedMetric.metrics, cachedMetric.nonFatalErrors } // Serious error - a namespace disappeared if err != nil { nonFatalErrors = append(nonFatalErrors, err) - log.Errorf("Collect Metric [%s] err %s", metricName, err) + log.Errorf("Collect Metric [%s] on %s err %s", metricName, s.dbName, err) } // Non-serious errors - likely version or parsing problems. if len(nonFatalErrors) > 0 { var errText string for _, err := range nonFatalErrors { - log.Errorf("Collect Metric [%s] nonFatalErrors err %s", metricName, err) + log.Errorf("Collect Metric [%s] %s nonFatalErrors err %s", metricName, s.dbName, err) errText += err.Error() } err = errors.New(errText) diff --git a/pkg/exporter/servers.go b/pkg/exporter/servers.go index bbaba54..303cd93 100644 --- a/pkg/exporter/servers.go +++ b/pkg/exporter/servers.go @@ -68,10 +68,8 @@ func (s *Servers) ScrapeDSN(ch chan<- prometheus.Metric) { } // 设置db信息. 根据查询进行关键字段转码 server.SetDBInfoMap(dbMaps) - if s.autoDiscovery { - if len(dbMaps) > 0 { - s.discoveryServer(dbMaps) - } + if s.autoDiscovery && len(dbMaps) > 0 { + s.discoveryServer(dbMaps, server.dbName) } s.collStatus = map[string]bool{} for i := range s.servers { @@ -89,7 +87,7 @@ func (s *Servers) ScrapeDSN(ch chan<- prometheus.Metric) { } } -func (s *Servers) discoveryServer(dbMaps map[string]*DBInfo) { +func (s *Servers) discoveryServer(dbMaps map[string]*DBInfo, currentDBName string) { dsnSetting := make(map[string]string) for k, v := range s.dsnSetting { dsnSetting[k] = v @@ -99,7 +97,11 @@ func (s *Servers) discoveryServer(dbMaps map[string]*DBInfo) { } newDBNames := s.genDiscoveryDBNames(dbMaps) for _, dbName := range newDBNames { + if dbName == currentDBName { + continue + } dsnSetting[DSNDatabase] = dbName + dsnSetting["application_name"] = "opengauss_exporter" dsn := genDSNString(dsnSetting) server, _ := s.GetServer(dsn) // 设置db信息 -- Gitee From affa994962cee1d36a24ab9f42bdaaee9a92c1b3 Mon Sep 17 00:00:00 2001 From: travelliu Date: Fri, 1 Mar 2024 12:37:55 +0800 Subject: [PATCH 4/4] refactor: Optimize session occupancy --- pkg/exporter/server_collect.go | 43 +++++---- pkg/exporter/server_query.go | 65 +++++++++---- pkg/exporter/server_test.go | 167 +++++++++++---------------------- 3 files changed, 124 insertions(+), 151 deletions(-) diff --git a/pkg/exporter/server_collect.go b/pkg/exporter/server_collect.go index 747ac48..d580f24 100644 --- a/pkg/exporter/server_collect.go +++ b/pkg/exporter/server_collect.go @@ -14,25 +14,25 @@ import ( "unicode/utf8" ) -func (s *Server) execSQL(ctx context.Context, sqlText string) (*sql.Rows, error) { - ch := make(chan struct{}, 0) - var ( - rows *sql.Rows - err error - ) - go func() { - rows, err = s.db.Query(sqlText) - ch <- struct{}{} - }() - select { - case <-ch: - return rows, err - case <-ctx.Done(): - return nil, ctx.Err() - } -} +// func (s *Server) execSQL(ctx context.Context, conn *sql.Conn, sqlText string) (*sql.Rows, error) { +// ch := make(chan struct{}) +// var ( +// rows *sql.Rows +// err error +// ) +// go func() { +// rows, err = conn.QueryContext(ctx, sqlText) +// ch <- struct{}{} +// }() +// select { +// case <-ch: +// return rows, err +// case <-ctx.Done(): +// return nil, ctx.Err() +// } +// } -func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Metric, []error, error) { +func (s *Server) doCollectMetric(queryInstance *QueryInstance, conn *sql.Conn) ([]prometheus.Metric, []error, error) { // 根据版本获取查询sql query := queryInstance.GetQuerySQL(s.lastMapVersion, s.primary) if query == nil { @@ -56,12 +56,15 @@ func (s *Server) doCollectMetric(queryInstance *QueryInstance) ([]prometheus.Met defer cancel() } log.Debugf("Collect Metric [%s] on %s query sql %s ", queryInstance.Name, s.dbName, query.SQL) - rows, err = s.execSQL(ctx, query.SQL) + // rows, err = s.execSQL(ctx, conn, query.SQL) + rows, err = conn.QueryContext(ctx, query.SQL) end := time.Now().Sub(begin).Milliseconds() log.Debugf("Collect Metric [%s] on %s query using time %vms", queryInstance.Name, s.dbName, end) if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { + if strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "canceling statement due to user request") || + strings.Contains(err.Error(), "canceling query due to user request") { log.Errorf("Collect Metric [%s] on %s query timeout %v", queryInstance.Name, s.dbName, query.TimeoutDuration()) err = fmt.Errorf("timeout %v %s", query.TimeoutDuration(), err) } else { diff --git a/pkg/exporter/server_query.go b/pkg/exporter/server_query.go index bc3d78f..ad77296 100644 --- a/pkg/exporter/server_query.go +++ b/pkg/exporter/server_query.go @@ -3,6 +3,8 @@ package exporter import ( + "context" + "database/sql" "fmt" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -50,35 +52,58 @@ func (s *Server) ScrapeWithMetric(ch chan<- prometheus.Metric, queryMetric map[s } // 查询监控指标. 先判断是否读取缓存. 禁用缓存或者缓存超时,则读取数据库 +// 启动 parallel 个协程,每个协程固定一个conn,监听指标通道 func (s *Server) queryMetrics(ch chan<- prometheus.Metric, queryMetric map[string]*QueryInstance) map[string]error { - metricErrors := &metricError{ - Errors: map[string]error{}, - Count: 0, - } - wg := sync.WaitGroup{} - limit := newRateLimit(s.parallel) - for _, queryInstance := range queryMetric { - metricName := queryInstance.Name - wg.Add(1) - limit.getToken() - go func(queryInst *QueryInstance) { + + var ( + parallel = s.parallel + metricChan = make(chan *QueryInstance, parallel) + wg = sync.WaitGroup{} + metricErrors = &metricError{ + Errors: map[string]error{}, + Count: 0, + } + ) + go func() { + for _, metric := range queryMetric { + metricChan <- metric + } + close(metricChan) + }() + wg.Add(parallel) + for i := 0; i < parallel; i++ { + go func(workNum int) { defer wg.Done() - defer limit.putToken() - err := s.queryMetric(ch, queryInst) + conn, err := s.db.Conn(context.Background()) if err != nil { - // 存在并发写入问题. 改成结构体加锁 - metricErrors.addError(metricName, err) + return } - }(queryInstance) - + defer conn.Close() + s.startQueryMetricThread(conn, ch, metricChan, metricErrors) + }(i) } wg.Wait() - s.ScrapeErrorCount = metricErrors.Count return metricErrors.Errors } -func (s *Server) queryMetric(ch chan<- prometheus.Metric, queryInstance *QueryInstance) error { +func (s *Server) startQueryMetricThread(conn *sql.Conn, ch chan<- prometheus.Metric, metricChan chan *QueryInstance, metricErrors *metricError) error { + for { + select { + case metric, ok := <-metricChan: + if !ok { + return nil + } + err := s.queryMetric(ch, metric, conn) + if err != nil { + // 存在并发写入问题. 改成结构体加锁 + metricErrors.addError(metric.Name, err) + } + } + } +} + +func (s *Server) queryMetric(ch chan<- prometheus.Metric, queryInstance *QueryInstance, conn *sql.Conn) error { var ( metricName = queryInstance.Name scrapeMetric = false // Whether to collect indicators from the database 是否从数据库里采集指标 @@ -121,7 +146,7 @@ func (s *Server) queryMetric(ch chan<- prometheus.Metric, queryInstance *QueryIn scrapeMetric = true } if scrapeMetric { - metrics, nonFatalErrors, err = s.doCollectMetric(queryInstance) + metrics, nonFatalErrors, err = s.doCollectMetric(queryInstance, conn) } else { log.Debugf("Collect Metric [%s] on %s use cache", metricName, s.dbName) metrics, nonFatalErrors = cachedMetric.metrics, cachedMetric.nonFatalErrors diff --git a/pkg/exporter/server_test.go b/pkg/exporter/server_test.go index d1e785d..84e2f80 100644 --- a/pkg/exporter/server_test.go +++ b/pkg/exporter/server_test.go @@ -3,6 +3,7 @@ package exporter import ( + "context" "database/sql" "fmt" "github.com/DATA-DOG/go-sqlmock" @@ -164,6 +165,20 @@ func Test_dbToString(t *testing.T) { } } +func genMockDB(t *testing.T, s *Server) (*sql.Conn, sqlmock.Sqlmock) { + db, mock, err := sqlmock.New() + if err != nil { + t.Error(err) + } + s.db = db + conn, err := s.db.Conn(context.Background()) + if err != nil { + t.Fatal(err) + } + return conn, mock + +} + func Test_Server(t *testing.T) { var ( db *sql.DB @@ -192,7 +207,7 @@ func Test_Server(t *testing.T) { metricName = "pg_lock" queryInstance = defaultMonList[metricName] ) - + _ = queryInstance.Check() t.Run("ServerOpt", func(t *testing.T) { s := &Server{ @@ -204,7 +219,7 @@ func Test_Server(t *testing.T) { assert.Equal(t, prometheus.Labels{ "server": "localhost:5432", }, s.labels) - + ServerWithNamespace("a1")(s) assert.Equal(t, "a1", s.namespace) ServerWithDisableSettingsMetrics(false)(s) @@ -284,8 +299,8 @@ omm,UTF8,A`)) s.db = db s.UP = true mock.ExpectQuery("SELECT").WillReturnRows( - sqlmock.NewRows([]string{"version", "client_encoding", "pg_is_in_recovery"}).AddRow( - "PostgreSQL 9.2.4 (openGauss 2.0.0 build 78689da9) compiled at 2021-03-31 21:04:03 commit 0 last mr on x86_64-unknown-linux-gnu, compiled by g++ (GCC) 7.3.0, 64-bit", "UTF8", false)) + sqlmock.NewRows([]string{"version", "client_encoding", "pg_is_in_recovery", "Name"}).AddRow( + "PostgreSQL 9.2.4 (openGauss 2.0.0 build 78689da9) compiled at 2021-03-31 21:04:03 commit 0 last mr on x86_64-unknown-linux-gnu, compiled by g++ (GCC) 7.3.0, 64-bit", "UTF8", false, "postgres")) err := s.getBaseInfo() assert.NoError(t, err) assert.Equal(t, "2.0.0", s.lastMapVersion.String()) @@ -293,11 +308,7 @@ omm,UTF8,A`)) assert.Equal(t, true, s.primary) }) t.Run("doCollectMetric", func(t *testing.T) { - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "mode", "count"}).FromCSVString(`postgres,AccessShareLock,4 omm,RowShareLock,0 @@ -315,17 +326,13 @@ postgres,ShareUpdateExclusiveLock,0 omm,AccessExclusiveLock,0 postgres,RowShareLock,0 postgres,AccessExclusiveLock,0`)) - metrics, errs, err := s.doCollectMetric(queryInstance) + metrics, errs, err := s.doCollectMetric(queryInstance, conn) assert.NoError(t, err) assert.ElementsMatch(t, errs, []error{}) assert.NotNil(t, metrics) }) t.Run("doCollectMetric_NoTimeOut", func(t *testing.T) { - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) queryInstance.Queries[0].Timeout = 0 mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "mode", "count"}).FromCSVString(`postgres,AccessShareLock,4 @@ -344,24 +351,21 @@ postgres,ShareUpdateExclusiveLock,0 omm,AccessExclusiveLock,0 postgres,RowShareLock,0 postgres,AccessExclusiveLock,0`)) - metrics, errs, err := s.doCollectMetric(queryInstance) + metrics, errs, err := s.doCollectMetric(queryInstance, conn) assert.NoError(t, err) assert.ElementsMatch(t, errs, []error{}) assert.NotNil(t, metrics) }) t.Run("doCollectMetric_query_nil", func(t *testing.T) { - metrics, errs, err := s.doCollectMetric(&QueryInstance{}) + conn, _ := genMockDB(t, s) + metrics, errs, err := s.doCollectMetric(&QueryInstance{}, conn) assert.NoError(t, err) assert.ElementsMatch(t, []error{}, errs) assert.ElementsMatch(t, []prometheus.Metric{}, metrics) }) t.Run("doCollectMetric_timeout", func(t *testing.T) { queryInstance.Queries[0].Timeout = 0.1 - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillDelayFor(1 * time.Second).WillReturnRows( sqlmock.NewRows([]string{"datname", "mode", "count"}).FromCSVString(`postgres,AccessShareLock,4 omm,RowShareLock,0 @@ -379,31 +383,23 @@ postgres,ShareUpdateExclusiveLock,0 omm,AccessExclusiveLock,0 postgres,RowShareLock,0 postgres,AccessExclusiveLock,0`)) - metrics, errs, err := s.doCollectMetric(queryInstance) + metrics, errs, err := s.doCollectMetric(queryInstance, conn) assert.Error(t, err) assert.ElementsMatch(t, []error{}, errs) assert.ElementsMatch(t, []prometheus.Metric{}, metrics) }) t.Run("doCollectMetric_query_err", func(t *testing.T) { - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillReturnError(fmt.Errorf("error")) - metrics, errs, err := s.doCollectMetric(queryInstance) + metrics, errs, err := s.doCollectMetric(queryInstance, conn) assert.Error(t, err) assert.ElementsMatch(t, []error{}, errs) assert.ElementsMatch(t, []prometheus.Metric{}, metrics) }) t.Run("doCollectMetric_query_context deadline exceeded", func(t *testing.T) { - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillReturnError(fmt.Errorf("context deadline exceeded")) - metrics, errs, err := s.doCollectMetric(queryInstance) + metrics, errs, err := s.doCollectMetric(queryInstance, conn) assert.Error(t, err) assert.ElementsMatch(t, []error{}, errs) assert.ElementsMatch(t, []prometheus.Metric{}, metrics) @@ -421,16 +417,12 @@ postgres,AccessExclusiveLock,0`)) t.Error(err) return } - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillDelayFor(1 * time.Second).WillReturnRows( sqlmock.NewRows([]string{"pid", "usesysid", "usename", "application_name", "client_addr", "client_hostname", "client_port", "backend_start", "state", "sender_sent_location", "receiver_write_location", "receiver_flush_location", "receiver_replay_location", "sync_priority", "sync_state", "pg_current_xlog_location", "pg_xlog_location_diff", }).FromCSVString(`140215315789568,10,omm,"WalSender to Standby","192.168.122.92","kvm-yl2",55802,"2021-01-06 14:45:59.944279+08","Streaming","0/331980B8","0/331980B8","0/331980B8","0/331980B8",1,Sync,"0/331980B8",0`)) - metrics, errs, err := s.doCollectMetric(queryInstance) + metrics, errs, err := s.doCollectMetric(queryInstance, conn) assert.NoError(t, err) assert.ElementsMatch(t, []error{}, errs) for _, m := range metrics { @@ -460,14 +452,10 @@ postgres,AccessExclusiveLock,0`)) t.Error(err) return } - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("select").WillDelayFor(1 * time.Second).WillReturnRows( sqlmock.NewRows([]string{"a1"}).AddRow(16384)) - _, errs, err := s.doCollectMetric(queryInstance) + _, errs, err := s.doCollectMetric(queryInstance, conn) assert.NoError(t, err) assert.Equal(t, []error{}, errs) }) @@ -494,14 +482,10 @@ postgres,AccessExclusiveLock,0`)) t.Error(err) return } - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("select").WillDelayFor(1 * time.Second).WillReturnRows( sqlmock.NewRows([]string{"a1"}).AddRow("a1")) - _, errs, err := s.doCollectMetric(queryInstance) + _, errs, err := s.doCollectMetric(queryInstance, conn) assert.NoError(t, err) assert.Equal(t, 0, len(errs)) }) @@ -519,7 +503,7 @@ postgres,AccessExclusiveLock,0`)) // Primary: true, } ch := make(chan prometheus.Metric) - err := s.queryMetric(ch, q) + err := s.queryMetric(ch, q, nil) assert.NoError(t, err) }) t.Run("queryMetric_query_nil", func(t *testing.T) { @@ -528,7 +512,7 @@ postgres,AccessExclusiveLock,0`)) q = &QueryInstance{} ) q.Queries = nil - err := s.queryMetric(ch, q) + err := s.queryMetric(ch, q, nil) assert.NoError(t, err) }) t.Run("queryMetric_query_disable", func(t *testing.T) { @@ -538,7 +522,7 @@ postgres,AccessExclusiveLock,0`)) ) _ = q.Check() q.Queries[0].Status = statusDisable - err := s.queryMetric(ch, q) + err := s.queryMetric(ch, q, nil) assert.NoError(t, err) }) t.Run("queryMetric_query_no_cache", func(t *testing.T) { @@ -559,16 +543,12 @@ postgres,AccessExclusiveLock,0`)) }, } ) - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) _ = q.Check() s.disableCache = true - err := s.queryMetric(ch, q) + err = s.queryMetric(ch, q, conn) assert.NoError(t, err) }) t.Run("queryMetric_query_cache", func(t *testing.T) { @@ -590,12 +570,8 @@ postgres,AccessExclusiveLock,0`)) }, } ) - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db s.disableCache = false + conn, mock := genMockDB(t, s) desc := prometheus.NewDesc("datname", fmt.Sprintf("Unknown metric from %s", metricName), queryInstance.LabelNames, s.labels) s.metricCache = map[string]*cachedMetrics{ @@ -607,18 +583,18 @@ postgres,AccessExclusiveLock,0`)) lastScrape: time.Now().Add(-8 * time.Second), }, } - err := s.queryMetric(ch, q) - + err := s.queryMetric(ch, q, conn) + assert.NoError(t, err) - + // cache 过期 time.Sleep(3 * time.Second) - + mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) _ = q.Check() s.disableCache = true - err = s.queryMetric(ch, q) + err = s.queryMetric(ch, q, conn) assert.NoError(t, err) }) t.Run("queryMetric_standby", func(t *testing.T) { @@ -641,7 +617,8 @@ postgres,AccessExclusiveLock,0`)) }, } ) - err := s.queryMetric(ch, q) + conn, _ := genMockDB(t, s) + err := s.queryMetric(ch, q, conn) assert.NoError(t, err) assert.Equal(t, 0, len(ch)) }) @@ -674,59 +651,27 @@ postgres,AccessExclusiveLock,0`)) t.Error(err) } s.db = db - + queryInstanceMap := map[string]*QueryInstance{ "pg_database": pg_database, } - + mock.ExpectQuery("SELECT").WillReturnRows( sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) errs := s.queryMetrics(ch, queryInstanceMap) assert.Equal(t, 0, len(errs)) }) - t.Run("doCollectMetric_utf8_gbk", func(t *testing.T) { - oldClientEncoding := s.clientEncoding - oldDBInfoMap := s.dbInfoMap - defer func() { - s.clientEncoding = oldClientEncoding - s.dbInfoMap = oldDBInfoMap - }() - pgActiveSlowsql.Check() - s.dsn = "postgres://mogdb:mtkOP@123@127.0.0.1:5438/postgres" - if err := s.ConnectDatabase(); err != nil { - t.Error(err) - } - if err := s.getBaseInfo(); err != nil { - t.Error(err) - } - if v, err := s.QueryDatabases(); err != nil { - t.Error(err) - } else { - s.SetDBInfoMap(v) - } - - metrics, errs, err := s.doCollectMetric(pgActiveSlowsql) - assert.Error(t, err) - assert.ElementsMatch(t, []error{}, errs) - for _, m := range metrics { - fmt.Printf("%#v\n", m) - } - assert.ElementsMatch(t, []prometheus.Metric{}, metrics) - }) t.Run("timeout", func(t *testing.T) { - db, mock, err = sqlmock.New() - if err != nil { - t.Error(err) - } - s.db = db + conn, mock := genMockDB(t, s) mock.ExpectQuery("SELECT").WillDelayFor(2 * time.Second).WillReturnRows( sqlmock.NewRows([]string{"datname", "size_bytes"}).AddRow("postgres", 1)) + conn, err := s.db.Conn(context.Background()) metric := &QueryInstance{ Name: "pg_database", Desc: "OpenGauss Database size", Queries: []*Query{ { - SQL: `SELECT datname,size_bytes from dual`, + SQL: `SELECT pg_sleep(1)`, Version: ">=0.0.0", TTL: 10, Timeout: 1.0, @@ -738,7 +683,7 @@ postgres,AccessExclusiveLock,0`)) }, } metric.Check() - _, _, err := s.doCollectMetric(metric) + _, _, err = s.doCollectMetric(metric, conn) assert.Error(t, err) }) } -- Gitee