2 Star 2 Fork 1

cockroachdb / cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
crdb_internal.go 39.16 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package sql
import (
"bytes"
"fmt"
"net"
"net/url"
"sort"
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/jobs"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
const crdbInternalName = "crdb_internal"
var crdbInternal = virtualSchema{
name: crdbInternalName,
tables: []virtualSchemaTable{
crdbInternalBuildInfoTable,
crdbInternalRuntimeInfoTable,
crdbInternalTablesTable,
crdbInternalLeasesTable,
crdbInternalSchemaChangesTable,
crdbInternalStmtStatsTable,
crdbInternalJobsTable,
crdbInternalSessionTraceTable,
crdbInternalClusterSettingsTable,
crdbInternalSessionVariablesTable,
crdbInternalLocalQueriesTable,
crdbInternalClusterQueriesTable,
crdbInternalLocalSessionsTable,
crdbInternalClusterSessionsTable,
crdbInternalBuiltinFunctionsTable,
crdbInternalCreateStmtsTable,
crdbInternalTableColumnsTable,
crdbInternalTableIndexesTable,
crdbInternalIndexColumnsTable,
crdbInternalBackwardDependenciesTable,
crdbInternalForwardDependenciesTable,
},
}
var crdbInternalBuildInfoTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.node_build_info (
node_id INT NOT NULL,
field STRING NOT NULL,
value STRING NOT NULL
);
`,
populate: func(_ context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
execCfg := p.ExecCfg()
nodeID := parser.NewDInt(parser.DInt(int64(execCfg.NodeID.Get())))
info := build.GetInfo()
for k, v := range map[string]string{
"Name": "CockroachDB",
"ClusterID": execCfg.ClusterID().String(),
"Organization": execCfg.Organization(),
"Build": info.Short(),
"Version": info.Tag,
} {
if err := addRow(
nodeID,
parser.NewDString(k),
parser.NewDString(v),
); err != nil {
return err
}
}
return nil
},
}
var crdbInternalRuntimeInfoTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.node_runtime_info (
node_id INT NOT NULL,
component STRING NOT NULL,
field STRING NOT NULL,
value STRING NOT NULL
);
`,
populate: func(_ context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
if p.session.User != security.RootUser {
return errors.New("only root can access the node runtime information")
}
node := p.ExecCfg().NodeInfo
nodeID := parser.NewDInt(parser.DInt(int64(node.NodeID.Get())))
dbURL, err := node.PGURL(url.User(security.RootUser))
if err != nil {
return err
}
for _, item := range []struct {
component string
url *url.URL
}{
{"DB", dbURL}, {"UI", node.AdminURL()},
} {
var user string
if item.url.User != nil {
user = item.url.User.String()
}
host, port, err := net.SplitHostPort(item.url.Host)
if err != nil {
return err
}
for _, kv := range [][2]string{
{"URL", item.url.String()},
{"Scheme", item.url.Scheme},
{"User", user},
{"Host", host},
{"Port", port},
{"URI", item.url.RequestURI()},
} {
k, v := kv[0], kv[1]
if err := addRow(
nodeID,
parser.NewDString(item.component),
parser.NewDString(k),
parser.NewDString(v),
); err != nil {
return err
}
}
}
return nil
},
}
var crdbInternalTablesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.tables (
table_id INT NOT NULL,
parent_id INT NOT NULL,
name STRING NOT NULL,
database_name STRING NOT NULL,
version INT NOT NULL,
mod_time TIMESTAMP NOT NULL,
mod_time_logical DECIMAL NOT NULL,
format_version STRING NOT NULL,
state STRING NOT NULL,
sc_lease_node_id INT,
sc_lease_expiration_time TIMESTAMP
);
`,
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
descs, err := getAllDescriptors(ctx, p.txn)
if err != nil {
return err
}
dbNames := make(map[sqlbase.ID]string)
// Record database descriptors for name lookups.
for _, desc := range descs {
db, ok := desc.(*sqlbase.DatabaseDescriptor)
if ok {
dbNames[db.ID] = db.Name
}
}
// Note: we do not use forEachTableDesc() here because we want to
// include added and dropped descriptors.
for _, desc := range descs {
table, ok := desc.(*sqlbase.TableDescriptor)
if !ok || !userCanSeeDescriptor(table, p.session.User) {
continue
}
dbName := dbNames[table.GetParentID()]
if dbName == "" {
// The parent database was deleted. This is possible e.g. when
// a database is dropped with CASCADE, and someone queries
// this virtual table before the dropped table descriptors are
// effectively deleted.
dbName = fmt.Sprintf("[%d]", table.GetParentID())
}
leaseNodeDatum := parser.DNull
leaseExpDatum := parser.DNull
if table.Lease != nil {
leaseNodeDatum = parser.NewDInt(parser.DInt(int64(table.Lease.NodeID)))
leaseExpDatum = parser.MakeDTimestamp(
timeutil.Unix(0, table.Lease.ExpirationTime), time.Nanosecond,
)
}
if err := addRow(
parser.NewDInt(parser.DInt(int64(table.ID))),
parser.NewDInt(parser.DInt(int64(table.GetParentID()))),
parser.NewDString(table.Name),
parser.NewDString(dbName),
parser.NewDInt(parser.DInt(int64(table.Version))),
parser.MakeDTimestamp(timeutil.Unix(0, table.ModificationTime.WallTime), time.Microsecond),
parser.TimestampToDecimal(table.ModificationTime),
parser.NewDString(table.FormatVersion.String()),
parser.NewDString(table.State.String()),
leaseNodeDatum,
leaseExpDatum,
); err != nil {
return err
}
}
return nil
},
}
var crdbInternalSchemaChangesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.schema_changes (
table_id INT NOT NULL,
parent_id INT NOT NULL,
name STRING NOT NULL,
type STRING NOT NULL,
target_id INT,
target_name STRING,
state STRING NOT NULL,
direction STRING NOT NULL
);
`,
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
descs, err := getAllDescriptors(ctx, p.txn)
if err != nil {
return err
}
// Note: we do not use forEachTableDesc() here because we want to
// include added and dropped descriptors.
for _, desc := range descs {
table, ok := desc.(*sqlbase.TableDescriptor)
if !ok || !userCanSeeDescriptor(table, p.session.User) {
continue
}
tableID := parser.NewDInt(parser.DInt(int64(table.ID)))
parentID := parser.NewDInt(parser.DInt(int64(table.GetParentID())))
tableName := parser.NewDString(table.Name)
for _, mut := range table.Mutations {
mutType := "UNKNOWN"
targetID := parser.DNull
targetName := parser.DNull
switch d := mut.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
mutType = "COLUMN"
targetID = parser.NewDInt(parser.DInt(int64(d.Column.ID)))
targetName = parser.NewDString(d.Column.Name)
case *sqlbase.DescriptorMutation_Index:
mutType = "INDEX"
targetID = parser.NewDInt(parser.DInt(int64(d.Index.ID)))
targetName = parser.NewDString(d.Index.Name)
}
if err := addRow(
tableID,
parentID,
tableName,
parser.NewDString(mutType),
targetID,
targetName,
parser.NewDString(mut.State.String()),
parser.NewDString(mut.Direction.String()),
); err != nil {
return err
}
}
}
return nil
},
}
var crdbInternalLeasesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.leases (
node_id INT NOT NULL,
table_id INT NOT NULL,
name STRING NOT NULL,
parent_id INT NOT NULL,
expiration TIMESTAMP NOT NULL,
deleted BOOL NOT NULL
);
`,
populate: func(_ context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
leaseMgr := p.LeaseMgr()
nodeID := parser.NewDInt(parser.DInt(int64(leaseMgr.nodeID.Get())))
leaseMgr.mu.Lock()
defer leaseMgr.mu.Unlock()
for tid, ts := range leaseMgr.mu.tables {
tableID := parser.NewDInt(parser.DInt(int64(tid)))
adder := func() error {
ts.mu.Lock()
defer ts.mu.Unlock()
dropped := parser.MakeDBool(parser.DBool(ts.mu.dropped))
for _, state := range ts.mu.active.data {
if !userCanSeeDescriptor(&state.TableDescriptor, p.session.User) {
continue
}
if !state.leased {
continue
}
expCopy := state.leaseExpiration()
if err := addRow(
nodeID,
tableID,
parser.NewDString(state.Name),
parser.NewDInt(parser.DInt(int64(state.GetParentID()))),
&expCopy,
dropped,
); err != nil {
return err
}
}
return nil
}
if err := adder(); err != nil {
return err
}
}
return nil
},
}
var crdbInternalJobsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.jobs (
id INT,
type STRING,
description STRING,
username STRING,
descriptor_ids INT[],
status STRING,
created TIMESTAMP,
started TIMESTAMP,
finished TIMESTAMP,
modified TIMESTAMP,
fraction_completed FLOAT,
error STRING,
coordinator_id INT
);
`,
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
rows, err := p.queryRows(ctx, `SELECT id, status, created, payload FROM system.jobs`)
if err != nil {
return err
}
for _, r := range rows {
id, status, created, bytes := r[0], r[1], r[2], r[3]
payload, err := jobs.UnmarshalPayload(bytes)
if err != nil {
return err
}
tsOrNull := func(micros int64) parser.Datum {
if micros == 0 {
return parser.DNull
}
ts := timeutil.Unix(0, micros*time.Microsecond.Nanoseconds())
return parser.MakeDTimestamp(ts, time.Microsecond)
}
descriptorIDs := parser.NewDArray(parser.TypeInt)
for _, descID := range payload.DescriptorIDs {
if err := descriptorIDs.Append(parser.NewDInt(parser.DInt(int(descID)))); err != nil {
return err
}
}
leaseNode := parser.DNull
if payload.Lease != nil {
leaseNode = parser.NewDInt(parser.DInt(payload.Lease.NodeID))
}
if err := addRow(
id,
parser.NewDString(payload.Type().String()),
parser.NewDString(payload.Description),
parser.NewDString(payload.Username),
descriptorIDs,
status,
created,
tsOrNull(payload.StartedMicros),
tsOrNull(payload.FinishedMicros),
tsOrNull(payload.ModifiedMicros),
parser.NewDFloat(parser.DFloat(payload.FractionCompleted)),
parser.NewDString(payload.Error),
leaseNode,
); err != nil {
return err
}
}
return nil
},
}
type stmtList []stmtKey
func (s stmtList) Len() int {
return len(s)
}
func (s stmtList) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s stmtList) Less(i, j int) bool {
return s[i].stmt < s[j].stmt
}
var crdbInternalStmtStatsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.node_statement_statistics (
node_id INT NOT NULL,
application_name STRING NOT NULL,
flags STRING NOT NULL,
key STRING NOT NULL,
anonymized STRING,
count INT NOT NULL,
first_attempt_count INT NOT NULL,
max_retries INT NOT NULL,
last_error STRING,
rows_avg FLOAT NOT NULL,
rows_var FLOAT NOT NULL,
parse_lat_avg FLOAT NOT NULL,
parse_lat_var FLOAT NOT NULL,
plan_lat_avg FLOAT NOT NULL,
plan_lat_var FLOAT NOT NULL,
run_lat_avg FLOAT NOT NULL,
run_lat_var FLOAT NOT NULL,
service_lat_avg FLOAT NOT NULL,
service_lat_var FLOAT NOT NULL,
overhead_lat_avg FLOAT NOT NULL,
overhead_lat_var FLOAT NOT NULL
);
`,
populate: func(_ context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
if p.session.User != security.RootUser {
return errors.New("only root can access application statistics")
}
sqlStats := p.session.sqlStats
if sqlStats == nil {
return errors.New("cannot access sql statistics from this context")
}
leaseMgr := p.LeaseMgr()
nodeID := parser.NewDInt(parser.DInt(int64(leaseMgr.nodeID.Get())))
// Retrieve the application names and sort them to ensure the
// output is deterministic.
var appNames []string
sqlStats.Lock()
for n := range sqlStats.apps {
appNames = append(appNames, n)
}
sqlStats.Unlock()
sort.Strings(appNames)
// Now retrieve the application stats proper.
for _, appName := range appNames {
appStats := sqlStats.getStatsForApplication(appName)
// Retrieve the statement keys and sort them to ensure the
// output is deterministic.
var stmtKeys stmtList
appStats.Lock()
for k := range appStats.stmts {
stmtKeys = append(stmtKeys, k)
}
appStats.Unlock()
// Now retrieve the per-stmt stats proper.
for _, stmtKey := range stmtKeys {
anonymized := parser.DNull
anonStr, ok := scrubStmtStatKey(p.session.virtualSchemas, stmtKey.stmt)
if ok {
anonymized = parser.NewDString(anonStr)
}
s := appStats.getStatsForStmt(stmtKey)
s.Lock()
errString := parser.DNull
if s.data.LastErr != "" {
errString = parser.NewDString(s.data.LastErr)
}
err := addRow(
nodeID,
parser.NewDString(appName),
parser.NewDString(stmtKey.flags()),
parser.NewDString(stmtKey.stmt),
anonymized,
parser.NewDInt(parser.DInt(s.data.Count)),
parser.NewDInt(parser.DInt(s.data.FirstAttemptCount)),
parser.NewDInt(parser.DInt(s.data.MaxRetries)),
errString,
parser.NewDFloat(parser.DFloat(s.data.NumRows.Mean)),
parser.NewDFloat(parser.DFloat(s.data.NumRows.GetVariance(s.data.Count))),
parser.NewDFloat(parser.DFloat(s.data.ParseLat.Mean)),
parser.NewDFloat(parser.DFloat(s.data.ParseLat.GetVariance(s.data.Count))),
parser.NewDFloat(parser.DFloat(s.data.PlanLat.Mean)),
parser.NewDFloat(parser.DFloat(s.data.PlanLat.GetVariance(s.data.Count))),
parser.NewDFloat(parser.DFloat(s.data.RunLat.Mean)),
parser.NewDFloat(parser.DFloat(s.data.RunLat.GetVariance(s.data.Count))),
parser.NewDFloat(parser.DFloat(s.data.ServiceLat.Mean)),
parser.NewDFloat(parser.DFloat(s.data.ServiceLat.GetVariance(s.data.Count))),
parser.NewDFloat(parser.DFloat(s.data.OverheadLat.Mean)),
parser.NewDFloat(parser.DFloat(s.data.OverheadLat.GetVariance(s.data.Count))),
)
s.Unlock()
if err != nil {
return err
}
}
}
return nil
},
}
// crdbInternalSessionTraceTable exposes the latest trace collected on this
// session (via SET TRACING={ON/OFF})
var crdbInternalSessionTraceTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.session_trace (
txn_idx INT NOT NULL, -- The transaction's 0-based index, among all
-- transactions that have been traced.
-- Only filled for the first log message in a
-- transaction's top-level span.
span_idx INT NOT NULL, -- The span's index.
message_idx INT NOT NULL, -- The message's index within its span.
timestamp TIMESTAMPTZ NOT NULL,-- The message's timestamp.
duration INTERVAL, -- The span's duration. Set only on the first
-- (dummy) message on a span.
-- NULL if the span was not finished at the time
-- the trace has been collected.
operation STRING NULL, -- The span's operation. Set only on
-- the first (dummy) message in a span.
message STRING NOT NULL -- The logged message.
);
`,
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
rows, err := p.session.Tracing.generateSessionTraceVTable()
if err != nil {
return err
}
for _, r := range rows {
if err := addRow(r[:]...); err != nil {
return err
}
}
return nil
},
}
// crdbInternalClusterSettingsTable exposes the list of current
// cluster settings.
var crdbInternalClusterSettingsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.cluster_settings (
name STRING NOT NULL,
current_value STRING NOT NULL,
type STRING NOT NULL,
description STRING NOT NULL
);
`,
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
for _, k := range settings.Keys() {
setting, _ := settings.Lookup(k)
if err := addRow(
parser.NewDString(k),
parser.NewDString(setting.String(&p.session.execCfg.Settings.SV)),
parser.NewDString(setting.Typ()),
parser.NewDString(setting.Description()),
); err != nil {
return err
}
}
return nil
},
}
// crdbInternalSessionVariablesTable exposes the session variables.
var crdbInternalSessionVariablesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.session_variables (
variable STRING NOT NULL,
value STRING NOT NULL
);
`,
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
for _, vName := range varNames {
gen := varGen[vName]
value := gen.Get(p.session)
if err := addRow(
parser.NewDString(vName),
parser.NewDString(value),
); err != nil {
return err
}
}
return nil
},
}
const queriesSchemaPattern = `
CREATE TABLE crdb_internal.%s (
query_id STRING, -- the cluster-unique ID of the query
node_id INT NOT NULL, -- the node on which the query is running
username STRING, -- the user running the query
start TIMESTAMP, -- the start time of the query
query STRING, -- the SQL code of the query
client_address STRING, -- the address of the client that issued the query
application_name STRING, -- the name of the application as per SET application_name
distributed BOOL, -- whether the query is running distributed
phase STRING -- the current execution phase
);
`
// crdbInternalLocalQueriesTable exposes the list of running queries
// on the current node. The results are dependent on the current user.
var crdbInternalLocalQueriesTable = virtualSchemaTable{
schema: fmt.Sprintf(queriesSchemaPattern, "node_queries"),
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
req := serverpb.ListSessionsRequest{Username: p.session.User}
response, err := p.session.execCfg.StatusServer.ListLocalSessions(ctx, &req)
if err != nil {
return err
}
return populateQueriesTable(ctx, addRow, response)
},
}
// crdbInternalClusterQueriesTable exposes the list of running queries
// on the entire cluster. The result is dependent on the current user.
var crdbInternalClusterQueriesTable = virtualSchemaTable{
schema: fmt.Sprintf(queriesSchemaPattern, "cluster_queries"),
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
req := serverpb.ListSessionsRequest{Username: p.session.User}
response, err := p.session.execCfg.StatusServer.ListSessions(ctx, &req)
if err != nil {
return err
}
return populateQueriesTable(ctx, addRow, response)
},
}
func populateQueriesTable(
ctx context.Context, addRow func(...parser.Datum) error, response *serverpb.ListSessionsResponse,
) error {
for _, session := range response.Sessions {
for _, query := range session.ActiveQueries {
isDistributedDatum := parser.DNull
phase := strings.ToLower(query.Phase.String())
if phase == "executing" {
isDistributedDatum = parser.DBoolFalse
if query.IsDistributed {
isDistributedDatum = parser.DBoolTrue
}
}
if err := addRow(
parser.NewDString(query.ID),
parser.NewDInt(parser.DInt(session.NodeID)),
parser.NewDString(session.Username),
parser.MakeDTimestamp(query.Start, time.Microsecond),
parser.NewDString(query.Sql),
parser.NewDString(session.ClientAddress),
parser.NewDString(session.ApplicationName),
isDistributedDatum,
parser.NewDString(phase),
); err != nil {
return err
}
}
}
for _, rpcErr := range response.Errors {
log.Warning(ctx, rpcErr.Message)
if rpcErr.NodeID != 0 {
// Add a row with this node ID, and nulls for all other columns
if err := addRow(
parser.DNull,
parser.NewDInt(parser.DInt(rpcErr.NodeID)),
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
); err != nil {
return err
}
}
}
return nil
}
const sessionsSchemaPattern = `
CREATE TABLE crdb_internal.%s (
node_id INT NOT NULL, -- the node on which the query is running
username STRING, -- the user running the query
client_address STRING, -- the address of the client that issued the query
application_name STRING, -- the name of the application as per SET application_name
active_queries STRING, -- the currently running queries as SQL
last_active_query STRING, -- the query that finished last on this session as SQL
session_start TIMESTAMP, -- the time when the session was opened
oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started
kv_txn STRING -- the ID of the current KV transaction
);
`
// crdbInternalLocalSessionsTable exposes the list of running sessions
// on the current node. The results are dependent on the current user.
var crdbInternalLocalSessionsTable = virtualSchemaTable{
schema: fmt.Sprintf(sessionsSchemaPattern, "node_sessions"),
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
req := serverpb.ListSessionsRequest{Username: p.session.User}
response, err := p.session.execCfg.StatusServer.ListLocalSessions(ctx, &req)
if err != nil {
return err
}
return populateSessionsTable(ctx, addRow, response)
},
}
// crdbInternalClusterSessionsTable exposes the list of running sessions
// on the entire cluster. The result is dependent on the current user.
var crdbInternalClusterSessionsTable = virtualSchemaTable{
schema: fmt.Sprintf(sessionsSchemaPattern, "cluster_sessions"),
populate: func(ctx context.Context, p *planner, _ string, addRow func(...parser.Datum) error) error {
req := serverpb.ListSessionsRequest{Username: p.session.User}
response, err := p.session.execCfg.StatusServer.ListSessions(ctx, &req)
if err != nil {
return err
}
return populateSessionsTable(ctx, addRow, response)
},
}
func populateSessionsTable(
ctx context.Context, addRow func(...parser.Datum) error, response *serverpb.ListSessionsResponse,
) error {
for _, session := range response.Sessions {
// Generate active_queries and oldest_query_start
var activeQueries bytes.Buffer
var oldestStart time.Time
var oldestStartDatum parser.Datum
for idx, query := range session.ActiveQueries {
if idx > 0 {
activeQueries.WriteString("; ")
}
activeQueries.WriteString(query.Sql)
if oldestStart.IsZero() || query.Start.Before(oldestStart) {
oldestStart = query.Start
}
}
if oldestStart.IsZero() {
oldestStartDatum = parser.DNull
} else {
oldestStartDatum = parser.MakeDTimestamp(oldestStart, time.Microsecond)
}
kvTxnIDDatum := parser.DNull
if session.KvTxnID != nil {
kvTxnIDDatum = parser.NewDString(session.KvTxnID.String())
}
if err := addRow(
parser.NewDInt(parser.DInt(session.NodeID)),
parser.NewDString(session.Username),
parser.NewDString(session.ClientAddress),
parser.NewDString(session.ApplicationName),
parser.NewDString(activeQueries.String()),
parser.NewDString(session.LastActiveQuery),
parser.MakeDTimestamp(session.Start, time.Microsecond),
oldestStartDatum,
kvTxnIDDatum,
); err != nil {
return err
}
}
for _, rpcErr := range response.Errors {
log.Warning(ctx, rpcErr.Message)
if rpcErr.NodeID != 0 {
// Add a row with this node ID, and nulls for all other columns
if err := addRow(
parser.NewDInt(parser.DInt(rpcErr.NodeID)),
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
parser.DNull,
); err != nil {
return err
}
}
}
return nil
}
// crdbInternalBuiltinFunctionsTable exposes the built-in function
// metadata.
var crdbInternalBuiltinFunctionsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.builtin_functions (
function STRING NOT NULL,
signature STRING NOT NULL,
category STRING NOT NULL,
details STRING NOT NULL
);
`,
populate: func(ctx context.Context, _ *planner, _ string, addRow func(...parser.Datum) error) error {
for _, name := range parser.AllBuiltinNames {
overloads := parser.Builtins[name]
for _, f := range overloads {
if err := addRow(
parser.NewDString(name),
parser.NewDString(f.Signature()),
parser.NewDString(f.Category()),
parser.NewDString(f.Info),
); err != nil {
return err
}
}
}
return nil
},
}
// crdbInternalCreateStmtsTable exposes the CREATE TABLE/CREATE VIEW
// statements.
var crdbInternalCreateStmtsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.create_statements (
database_id INT,
database_name STRING NOT NULL,
descriptor_id INT,
descriptor_type STRING NOT NULL,
descriptor_name STRING NOT NULL,
create_statement STRING NOT NULL,
state STRING NOT NULL
)
`,
populate: func(ctx context.Context, p *planner, prefix string, addRow func(...parser.Datum) error) error {
return forEachTableDescAll(ctx, p, prefix,
func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error {
var descType parser.Datum
var stmt string
var err error
var typeView = parser.DString("view")
var typeTable = parser.DString("table")
if table.IsView() {
descType = &typeView
stmt, err = p.showCreateView(ctx, parser.Name(table.Name), table)
} else {
descType = &typeTable
stmt, err = p.showCreateTable(ctx, parser.Name(table.Name), prefix, table)
}
if err != nil {
return err
}
descID := parser.DNull
if table.ID != keys.VirtualDescriptorID {
descID = parser.NewDInt(parser.DInt(table.ID))
}
dbDescID := parser.DNull
if db.ID != keys.VirtualDescriptorID {
dbDescID = parser.NewDInt(parser.DInt(db.ID))
}
return addRow(
dbDescID,
parser.NewDString(db.Name),
descID,
descType,
parser.NewDString(table.Name),
parser.NewDString(stmt),
parser.NewDString(table.State.String()),
)
})
},
}
// crdbInternalTableColumnsTable exposes the column descriptors.
var crdbInternalTableColumnsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.table_columns (
descriptor_id INT,
descriptor_name STRING NOT NULL,
column_id INT NOT NULL,
column_name STRING NOT NULL,
column_type STRING NOT NULL,
nullable BOOL NOT NULL,
default_expr STRING,
hidden BOOL NOT NULL
)
`,
populate: func(ctx context.Context, p *planner, prefix string, addRow func(...parser.Datum) error) error {
return forEachTableDescAll(ctx, p, prefix,
func(_ *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error {
tableID := parser.DNull
if table.ID != keys.VirtualDescriptorID {
tableID = parser.NewDInt(parser.DInt(table.ID))
}
tableName := parser.NewDString(table.Name)
for _, col := range table.Columns {
defStr := parser.DNull
if col.DefaultExpr != nil {
defStr = parser.NewDString(*col.DefaultExpr)
}
if err := addRow(
tableID,
tableName,
parser.NewDInt(parser.DInt(col.ID)),
parser.NewDString(col.Name),
parser.NewDString(col.Type.String()),
parser.MakeDBool(parser.DBool(col.Nullable)),
defStr,
parser.MakeDBool(parser.DBool(col.Hidden)),
); err != nil {
return err
}
}
return nil
})
},
}
// crdbInternalTableIndexesTable exposes the index descriptors.
var crdbInternalTableIndexesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.table_indexes (
descriptor_id INT,
descriptor_name STRING NOT NULL,
index_id INT NOT NULL,
index_name STRING NOT NULL,
index_type STRING NOT NULL,
is_unique BOOL NOT NULL
)
`,
populate: func(ctx context.Context, p *planner, prefix string, addRow func(...parser.Datum) error) error {
primary := parser.NewDString("primary")
secondary := parser.NewDString("secondary")
return forEachTableDescAll(ctx, p, prefix,
func(_ *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error {
tableID := parser.DNull
if table.ID != keys.VirtualDescriptorID {
tableID = parser.NewDInt(parser.DInt(table.ID))
}
tableName := parser.NewDString(table.Name)
if err := addRow(
tableID,
tableName,
parser.NewDInt(parser.DInt(table.PrimaryIndex.ID)),
parser.NewDString(table.PrimaryIndex.Name),
primary,
parser.MakeDBool(parser.DBool(table.PrimaryIndex.Unique)),
); err != nil {
return err
}
for _, idx := range table.Indexes {
if err := addRow(
tableID,
tableName,
parser.NewDInt(parser.DInt(idx.ID)),
parser.NewDString(idx.Name),
secondary,
parser.MakeDBool(parser.DBool(idx.Unique)),
); err != nil {
return err
}
}
return nil
})
},
}
// crdbInternalIndexColumnsTable exposes the index columns.
var crdbInternalIndexColumnsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.index_columns (
descriptor_id INT,
descriptor_name STRING NOT NULL,
index_id INT NOT NULL,
index_name STRING NOT NULL,
column_type STRING NOT NULL,
column_id INT NOT NULL,
column_name STRING,
column_direction STRING
)
`,
populate: func(ctx context.Context, p *planner, prefix string, addRow func(...parser.Datum) error) error {
key := parser.NewDString("key")
storing := parser.NewDString("storing")
extra := parser.NewDString("extra")
composite := parser.NewDString("composite")
idxDirMap := map[sqlbase.IndexDescriptor_Direction]parser.Datum{
sqlbase.IndexDescriptor_ASC: parser.NewDString(sqlbase.IndexDescriptor_ASC.String()),
sqlbase.IndexDescriptor_DESC: parser.NewDString(sqlbase.IndexDescriptor_DESC.String()),
}
return forEachTableDescAll(ctx, p, prefix,
func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error {
tableID := parser.DNull
if table.ID != keys.VirtualDescriptorID {
tableID = parser.NewDInt(parser.DInt(table.ID))
}
tableName := parser.NewDString(table.Name)
reportIndex := func(idx *sqlbase.IndexDescriptor) error {
idxID := parser.NewDInt(parser.DInt(idx.ID))
idxName := parser.NewDString(idx.Name)
// Report the main (key) columns.
for i, c := range idx.ColumnIDs {
colName := parser.DNull
colDir := parser.DNull
if i >= len(idx.ColumnNames) {
// We log an error here, instead of reporting an error
// to the user, because we really want to see the
// erroneous data in the virtual table.
log.Errorf(ctx, "index descriptor for [%d@%d] (%s.%s@%s) has more key column IDs (%d) than names (%d) (corrupted schema?)",
table.ID, idx.ID, db.Name, table.Name, idx.Name,
len(idx.ColumnIDs), len(idx.ColumnNames))
} else {
colName = parser.NewDString(idx.ColumnNames[i])
}
if i >= len(idx.ColumnDirections) {
// See comment above.
log.Errorf(ctx, "index descriptor for [%d@%d] (%s.%s@%s) has more key column IDs (%d) than directions (%d) (corrupted schema?)",
table.ID, idx.ID, db.Name, table.Name, idx.Name,
len(idx.ColumnIDs), len(idx.ColumnDirections))
} else {
colDir = idxDirMap[idx.ColumnDirections[i]]
}
if err := addRow(
tableID, tableName, idxID, idxName,
key, parser.NewDInt(parser.DInt(c)), colName, colDir,
); err != nil {
return err
}
}
// Report the stored columns.
for _, c := range idx.StoreColumnIDs {
if err := addRow(
tableID, tableName, idxID, idxName,
storing, parser.NewDInt(parser.DInt(c)), parser.DNull, parser.DNull,
); err != nil {
return err
}
}
// Report the extra columns.
for _, c := range idx.ExtraColumnIDs {
if err := addRow(
tableID, tableName, idxID, idxName,
extra, parser.NewDInt(parser.DInt(c)), parser.DNull, parser.DNull,
); err != nil {
return err
}
}
// Report the composite columns
for _, c := range idx.CompositeColumnIDs {
if err := addRow(
tableID, tableName, idxID, idxName,
composite, parser.NewDInt(parser.DInt(c)), parser.DNull, parser.DNull,
); err != nil {
return err
}
}
return nil
}
if err := reportIndex(&table.PrimaryIndex); err != nil {
return err
}
for i := range table.Indexes {
if err := reportIndex(&table.Indexes[i]); err != nil {
return err
}
}
return nil
})
},
}
// crdbInternalBackwardDependenciesTable exposes the backward
// inter-descriptor dependencies.
var crdbInternalBackwardDependenciesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.backward_dependencies (
descriptor_id INT,
descriptor_name STRING NOT NULL,
index_id INT,
dependson_id INT NOT NULL,
dependson_type STRING NOT NULL,
dependson_index_id INT,
dependson_name STRING,
dependson_details STRING
)
`,
populate: func(ctx context.Context, p *planner, prefix string, addRow func(...parser.Datum) error) error {
fkDep := parser.NewDString("fk")
viewDep := parser.NewDString("view")
interleaveDep := parser.NewDString("interleave")
return forEachTableDescAll(ctx, p, prefix,
func(_ *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error {
tableID := parser.DNull
if table.ID != keys.VirtualDescriptorID {
tableID = parser.NewDInt(parser.DInt(table.ID))
}
tableName := parser.NewDString(table.Name)
reportIdxDeps := func(idx *sqlbase.IndexDescriptor) error {
idxID := parser.NewDInt(parser.DInt(idx.ID))
if idx.ForeignKey.Table != 0 {
fkRef := &idx.ForeignKey
if err := addRow(
tableID, tableName,
idxID,
parser.NewDInt(parser.DInt(fkRef.Table)),
fkDep,
parser.NewDInt(parser.DInt(fkRef.Index)),
parser.NewDString(fkRef.Name),
parser.NewDString(fmt.Sprintf("SharedPrefixLen: %d", fkRef.SharedPrefixLen)),
); err != nil {
return err
}
}
for _, interleaveParent := range idx.Interleave.Ancestors {
if err := addRow(
tableID, tableName,
idxID,
parser.NewDInt(parser.DInt(interleaveParent.TableID)),
interleaveDep,
parser.NewDInt(parser.DInt(interleaveParent.IndexID)),
parser.DNull,
parser.NewDString(fmt.Sprintf("SharedPrefixLen: %d",
interleaveParent.SharedPrefixLen)),
); err != nil {
return err
}
}
return nil
}
// Record the backward references of the primary index.
if err := reportIdxDeps(&table.PrimaryIndex); err != nil {
return err
}
// Record the backward references of secondary indexes.
for i := range table.Indexes {
if err := reportIdxDeps(&table.Indexes[i]); err != nil {
return err
}
}
// Record the view dependencies.
for _, tIdx := range table.DependsOn {
if err := addRow(
tableID, tableName,
parser.DNull,
parser.NewDInt(parser.DInt(tIdx)),
viewDep,
parser.DNull,
parser.DNull,
parser.DNull,
); err != nil {
return err
}
}
return nil
})
},
}
// crdbInternalForwardDependenciesTable exposes the forward
// inter-descriptor dependencies.
var crdbInternalForwardDependenciesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.forward_dependencies (
descriptor_id INT,
descriptor_name STRING NOT NULL,
index_id INT,
dependedonby_id INT NOT NULL,
dependedonby_type STRING NOT NULL,
dependedonby_index_id INT,
dependedonby_name STRING,
dependedonby_details STRING
)
`,
populate: func(ctx context.Context, p *planner, prefix string, addRow func(...parser.Datum) error) error {
fkDep := parser.NewDString("fk")
viewDep := parser.NewDString("view")
interleaveDep := parser.NewDString("interleave")
return forEachTableDescAll(ctx, p, prefix,
func(_ *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error {
tableID := parser.DNull
if table.ID != keys.VirtualDescriptorID {
tableID = parser.NewDInt(parser.DInt(table.ID))
}
tableName := parser.NewDString(table.Name)
reportIdxDeps := func(idx *sqlbase.IndexDescriptor) error {
idxID := parser.NewDInt(parser.DInt(idx.ID))
for _, fkRef := range idx.ReferencedBy {
if err := addRow(
tableID, tableName,
idxID,
parser.NewDInt(parser.DInt(fkRef.Table)),
fkDep,
parser.NewDInt(parser.DInt(fkRef.Index)),
parser.NewDString(fkRef.Name),
parser.NewDString(fmt.Sprintf("SharedPrefixLen: %d", fkRef.SharedPrefixLen)),
); err != nil {
return err
}
}
for _, interleaveRef := range idx.InterleavedBy {
if err := addRow(
tableID, tableName,
idxID,
parser.NewDInt(parser.DInt(interleaveRef.Table)),
interleaveDep,
parser.NewDInt(parser.DInt(interleaveRef.Index)),
parser.DNull,
parser.NewDString(fmt.Sprintf("SharedPrefixLen: %d",
interleaveRef.SharedPrefixLen)),
); err != nil {
return err
}
}
return nil
}
// Record the backward references of the primary index.
if err := reportIdxDeps(&table.PrimaryIndex); err != nil {
return err
}
// Record the backward references of secondary indexes.
for i := range table.Indexes {
if err := reportIdxDeps(&table.Indexes[i]); err != nil {
return err
}
}
// Record the view dependencies.
for _, dep := range table.DependedOnBy {
if err := addRow(
tableID, tableName,
parser.DNull,
parser.NewDInt(parser.DInt(dep.ID)),
viewDep,
parser.NewDInt(parser.DInt(dep.IndexID)),
parser.DNull,
parser.NewDString(fmt.Sprintf("Columns: %v", dep.ColumnIDs)),
); err != nil {
return err
}
}
return nil
})
},
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.1.1

搜索帮助

344bd9b3 5694891 D2dac590 5694891