3 Star 2 Fork 0

Gitee 极速下载/orchestrator

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/outbrain/orchestrator/
克隆/下载
db.go 29.30 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package db
import (
"database/sql"
"fmt"
"github.com/go-sql-driver/mysql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
"github.com/outbrain/orchestrator/go/config"
"github.com/outbrain/orchestrator/go/ssl"
"strings"
)
var internalDBDeploymentSQL = []string{
`
CREATE TABLE IF NOT EXISTS _orchestrator_db_deployment (
deployment_id int unsigned NOT NULL AUTO_INCREMENT,
deployment_type enum('base', 'patch'),
deploy_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
sql_statement TEXT CHARSET ascii NOT NULL,
statement_digest VARCHAR(128) CHARSET ascii NOT NULL,
statement_index INT UNSIGNED NOT NULL,
PRIMARY KEY (deployment_id),
UNIQUE KEY sql_index_uidx (statement_digest, statement_index)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
}
// generateSQLBase & generateSQLPatches are lists of SQL statements required to build the orchestrator backend
var generateSQLBase = []string{
`
CREATE TABLE IF NOT EXISTS database_instance (
hostname varchar(128) CHARACTER SET ascii NOT NULL,
port smallint(5) unsigned NOT NULL,
last_checked timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen timestamp NULL DEFAULT NULL,
server_id int(10) unsigned NOT NULL,
version varchar(128) CHARACTER SET ascii NOT NULL,
binlog_format varchar(16) CHARACTER SET ascii NOT NULL,
log_bin tinyint(3) unsigned NOT NULL,
log_slave_updates tinyint(3) unsigned NOT NULL,
binary_log_file varchar(128) CHARACTER SET ascii NOT NULL,
binary_log_pos bigint(20) unsigned NOT NULL,
master_host varchar(128) CHARACTER SET ascii NOT NULL,
master_port smallint(5) unsigned NOT NULL,
slave_sql_running tinyint(3) unsigned NOT NULL,
slave_io_running tinyint(3) unsigned NOT NULL,
master_log_file varchar(128) CHARACTER SET ascii NOT NULL,
read_master_log_pos bigint(20) unsigned NOT NULL,
relay_master_log_file varchar(128) CHARACTER SET ascii NOT NULL,
exec_master_log_pos bigint(20) unsigned NOT NULL,
seconds_behind_master bigint(20) unsigned DEFAULT NULL,
slave_lag_seconds bigint(20) unsigned DEFAULT NULL,
num_slave_hosts int(10) unsigned NOT NULL,
slave_hosts text CHARACTER SET ascii NOT NULL,
cluster_name tinytext CHARACTER SET ascii NOT NULL,
PRIMARY KEY (hostname,port),
KEY cluster_name_idx (cluster_name(128)),
KEY last_checked_idx (last_checked),
KEY last_seen_idx (last_seen)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS database_instance_maintenance (
database_instance_maintenance_id int(10) unsigned NOT NULL AUTO_INCREMENT,
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
maintenance_active tinyint(4) DEFAULT NULL,
begin_timestamp timestamp NULL DEFAULT NULL,
end_timestamp timestamp NULL DEFAULT NULL,
owner varchar(128) CHARACTER SET utf8 NOT NULL,
reason text CHARACTER SET utf8 NOT NULL,
PRIMARY KEY (database_instance_maintenance_id),
UNIQUE KEY maintenance_uidx (maintenance_active, hostname, port)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS database_instance_long_running_queries (
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
process_id bigint(20) NOT NULL,
process_started_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
process_user varchar(16) CHARACTER SET utf8 NOT NULL,
process_host varchar(128) CHARACTER SET utf8 NOT NULL,
process_db varchar(128) CHARACTER SET utf8 NOT NULL,
process_command varchar(16) CHARACTER SET utf8 NOT NULL,
process_time_seconds int(11) NOT NULL,
process_state varchar(128) CHARACTER SET utf8 NOT NULL,
process_info varchar(1024) CHARACTER SET utf8 NOT NULL,
PRIMARY KEY (hostname,port,process_id),
KEY process_started_at_idx (process_started_at)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS audit (
audit_id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
audit_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
audit_type varchar(128) CHARACTER SET ascii NOT NULL,
hostname varchar(128) CHARACTER SET ascii NOT NULL DEFAULT '',
port smallint(5) unsigned NOT NULL,
message text CHARACTER SET utf8 NOT NULL,
PRIMARY KEY (audit_id),
KEY audit_timestamp_idx (audit_timestamp),
KEY host_port_idx (hostname, port, audit_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
`,
`
CREATE TABLE IF NOT EXISTS host_agent (
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
token varchar(128) NOT NULL,
last_submitted timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_checked timestamp NULL DEFAULT NULL,
last_seen timestamp NULL DEFAULT NULL,
mysql_port smallint(5) unsigned DEFAULT NULL,
count_mysql_snapshots smallint(5) unsigned NOT NULL,
PRIMARY KEY (hostname),
KEY token_idx (token(32)),
KEY last_submitted_idx (last_submitted),
KEY last_checked_idx (last_checked),
KEY last_seen_idx (last_seen)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS agent_seed (
agent_seed_id int(10) unsigned NOT NULL AUTO_INCREMENT,
target_hostname varchar(128) NOT NULL,
source_hostname varchar(128) NOT NULL,
start_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_timestamp timestamp NOT NULL,
is_complete tinyint(3) unsigned NOT NULL DEFAULT '0',
is_successful tinyint(3) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (agent_seed_id),
KEY target_hostname_idx (target_hostname,is_complete),
KEY source_hostname_idx (source_hostname,is_complete),
KEY start_timestamp_idx (start_timestamp),
KEY is_complete_idx (is_complete,start_timestamp),
KEY is_successful_idx (is_successful, start_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS agent_seed_state (
agent_seed_state_id int(10) unsigned NOT NULL AUTO_INCREMENT,
agent_seed_id int(10) unsigned NOT NULL,
state_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
state_action varchar(127) NOT NULL,
error_message varchar(255) NOT NULL,
PRIMARY KEY (agent_seed_state_id),
KEY agent_seed_idx (agent_seed_id, state_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS host_attributes (
hostname varchar(128) NOT NULL,
attribute_name varchar(128) NOT NULL,
attribute_value varchar(128) NOT NULL,
submit_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
expire_timestamp timestamp NULL DEFAULT NULL,
PRIMARY KEY (hostname,attribute_name),
KEY attribute_name_idx (attribute_name),
KEY attribute_value_idx (attribute_value),
KEY submit_timestamp_idx (submit_timestamp),
KEY expire_timestamp_idx (expire_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS hostname_resolve (
hostname varchar(128) NOT NULL,
resolved_hostname varchar(128) NOT NULL,
resolved_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (hostname),
KEY resolved_timestamp_idx (resolved_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS cluster_alias (
cluster_name varchar(128) CHARACTER SET ascii NOT NULL,
alias varchar(128) NOT NULL,
PRIMARY KEY (cluster_name)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS active_node (
anchor tinyint unsigned NOT NULL,
hostname varchar(128) CHARACTER SET ascii NOT NULL,
token varchar(128) NOT NULL,
last_seen_active timestamp NOT NULL,
PRIMARY KEY (anchor)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
INSERT IGNORE INTO active_node (anchor, hostname, token, last_seen_active)
VALUES (1, '', '', NOW())
`,
`
CREATE TABLE IF NOT EXISTS node_health (
hostname varchar(128) CHARACTER SET ascii NOT NULL,
token varchar(128) NOT NULL,
last_seen_active timestamp NOT NULL,
PRIMARY KEY (hostname)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
DROP VIEW IF EXISTS _whats_wrong
`,
`
DROP VIEW IF EXISTS whats_wrong
`,
`
DROP VIEW IF EXISTS whats_wrong_summary
`,
`
CREATE TABLE IF NOT EXISTS topology_recovery (
recovery_id bigint unsigned not null auto_increment,
hostname varchar(128) NOT NULL,
port smallint unsigned NOT NULL,
in_active_period tinyint unsigned NOT NULL DEFAULT 0,
start_active_period timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_active_period_unixtime int unsigned,
end_recovery timestamp NULL,
processing_node_hostname varchar(128) CHARACTER SET ascii NOT NULL,
processcing_node_token varchar(128) NOT NULL,
successor_hostname varchar(128) DEFAULT NULL,
successor_port smallint unsigned DEFAULT NULL,
PRIMARY KEY (recovery_id),
UNIQUE KEY hostname_port_active_period_uidx(hostname, port, in_active_period, end_active_period_unixtime),
KEY in_active_start_period_idx (in_active_period, start_active_period),
KEY start_active_period_idx (start_active_period)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS hostname_unresolve (
hostname varchar(128) NOT NULL,
unresolved_hostname varchar(128) NOT NULL,
PRIMARY KEY (hostname),
KEY unresolved_hostname_idx (unresolved_hostname)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS database_instance_pool (
hostname varchar(128) CHARACTER SET ascii NOT NULL,
port smallint(5) unsigned NOT NULL,
pool varchar(128) NOT NULL,
PRIMARY KEY (hostname, port, pool),
KEY pool_idx (pool)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS database_instance_topology_history (
snapshot_unix_timestamp INT UNSIGNED NOT NULL,
hostname varchar(128) CHARACTER SET ascii NOT NULL,
port smallint(5) unsigned NOT NULL,
master_host varchar(128) CHARACTER SET ascii NOT NULL,
master_port smallint(5) unsigned NOT NULL,
cluster_name tinytext CHARACTER SET ascii NOT NULL,
PRIMARY KEY (snapshot_unix_timestamp, hostname, port),
KEY cluster_name_idx (snapshot_unix_timestamp, cluster_name(128))
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS candidate_database_instance (
hostname varchar(128) CHARACTER SET ascii NOT NULL,
port smallint(5) unsigned NOT NULL,
last_suggested TIMESTAMP NOT NULL,
PRIMARY KEY (hostname, port),
KEY last_suggested_idx (last_suggested)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS database_instance_downtime (
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
downtime_active tinyint(4) DEFAULT NULL,
begin_timestamp timestamp DEFAULT CURRENT_TIMESTAMP,
end_timestamp timestamp,
owner varchar(128) CHARACTER SET utf8 NOT NULL,
reason text CHARACTER SET utf8 NOT NULL,
PRIMARY KEY (hostname, port)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS topology_failure_detection (
detection_id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
hostname varchar(128) NOT NULL,
port smallint unsigned NOT NULL,
in_active_period tinyint unsigned NOT NULL DEFAULT '0',
start_active_period timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_active_period_unixtime int unsigned NOT NULL,
processing_node_hostname varchar(128) NOT NULL,
processcing_node_token varchar(128) NOT NULL,
analysis varchar(128) NOT NULL,
cluster_name varchar(128) NOT NULL,
cluster_alias varchar(128) NOT NULL,
count_affected_slaves int unsigned NOT NULL,
slave_hosts text NOT NULL,
PRIMARY KEY (detection_id),
UNIQUE KEY hostname_port_active_period_uidx (hostname, port, in_active_period, end_active_period_unixtime),
KEY in_active_start_period_idx (in_active_period, start_active_period)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS hostname_resolve_history (
resolved_hostname varchar(128) NOT NULL,
hostname varchar(128) NOT NULL,
resolved_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (resolved_hostname),
KEY (hostname),
KEY resolved_timestamp_idx (resolved_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS hostname_unresolve_history (
unresolved_hostname varchar(128) NOT NULL,
hostname varchar(128) NOT NULL,
last_registered TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (unresolved_hostname),
KEY (hostname),
KEY last_registered_idx (last_registered)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS cluster_domain_name (
cluster_name varchar(128) CHARACTER SET ascii NOT NULL,
domain_name varchar(128) NOT NULL,
PRIMARY KEY (cluster_name),
KEY domain_name_idx(domain_name(32))
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS master_position_equivalence (
equivalence_id bigint unsigned not null auto_increment,
master1_hostname varchar(128) CHARACTER SET ascii NOT NULL,
master1_port smallint(5) unsigned NOT NULL,
master1_binary_log_file varchar(128) CHARACTER SET ascii NOT NULL,
master1_binary_log_pos bigint(20) unsigned NOT NULL,
master2_hostname varchar(128) CHARACTER SET ascii NOT NULL,
master2_port smallint(5) unsigned NOT NULL,
master2_binary_log_file varchar(128) CHARACTER SET ascii NOT NULL,
master2_binary_log_pos bigint(20) unsigned NOT NULL,
last_suggested TIMESTAMP NOT NULL,
PRIMARY KEY (equivalence_id),
UNIQUE KEY equivalence_uidx (master1_hostname, master1_port, master1_binary_log_file, master1_binary_log_pos, master2_hostname, master2_port),
KEY master2_idx (master2_hostname, master2_port, master2_binary_log_file, master2_binary_log_pos),
KEY last_suggested_idx(last_suggested)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
`
CREATE TABLE IF NOT EXISTS async_request (
request_id bigint unsigned NOT NULL AUTO_INCREMENT,
command varchar(128) charset ascii not null,
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
destination_hostname varchar(128) NOT NULL,
destination_port smallint(5) unsigned NOT NULL,
pattern text CHARACTER SET utf8 NOT NULL,
gtid_hint varchar(32) charset ascii not null,
begin_timestamp timestamp NULL DEFAULT NULL,
end_timestamp timestamp NULL DEFAULT NULL,
story text CHARACTER SET utf8 NOT NULL,
PRIMARY KEY (request_id),
KEY begin_timestamp_idx (begin_timestamp),
KEY end_timestamp_idx (end_timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=ascii
`,
}
var generateSQLPatches = []string{
`
ALTER TABLE
database_instance
ADD COLUMN read_only TINYINT UNSIGNED NOT NULL AFTER version
`,
`
ALTER TABLE
database_instance
ADD COLUMN last_sql_error TEXT NOT NULL AFTER exec_master_log_pos
`,
`
ALTER TABLE
database_instance
ADD COLUMN last_io_error TEXT NOT NULL AFTER last_sql_error
`,
`
ALTER TABLE
database_instance
ADD COLUMN last_attempted_check TIMESTAMP AFTER last_checked
`,
`
ALTER TABLE
database_instance
ADD COLUMN oracle_gtid TINYINT UNSIGNED NOT NULL AFTER slave_io_running
`,
`
ALTER TABLE
database_instance
ADD COLUMN mariadb_gtid TINYINT UNSIGNED NOT NULL AFTER oracle_gtid
`,
`
ALTER TABLE
database_instance
ADD COLUMN relay_log_file varchar(128) CHARACTER SET ascii NOT NULL AFTER exec_master_log_pos
`,
`
ALTER TABLE
database_instance
ADD COLUMN relay_log_pos bigint unsigned NOT NULL AFTER relay_log_file
`,
`
ALTER TABLE
database_instance
ADD INDEX master_host_port_idx (master_host, master_port)
`,
`
ALTER TABLE
database_instance
ADD COLUMN pseudo_gtid TINYINT UNSIGNED NOT NULL AFTER mariadb_gtid
`,
`
ALTER TABLE
database_instance
ADD COLUMN replication_depth TINYINT UNSIGNED NOT NULL AFTER cluster_name
`,
`
ALTER TABLE
database_instance
ADD COLUMN has_replication_filters TINYINT UNSIGNED NOT NULL AFTER slave_io_running
`,
`
ALTER TABLE
database_instance
ADD COLUMN data_center varchar(32) CHARACTER SET ascii NOT NULL AFTER cluster_name
`,
`
ALTER TABLE
database_instance
ADD COLUMN physical_environment varchar(32) CHARACTER SET ascii NOT NULL AFTER data_center
`,
`
ALTER TABLE
database_instance_maintenance
ADD KEY active_timestamp_idx (maintenance_active, begin_timestamp)
`,
`
ALTER TABLE
database_instance
ADD COLUMN uptime INT UNSIGNED NOT NULL AFTER last_seen
`,
`
ALTER TABLE
cluster_alias
ADD UNIQUE KEY alias_uidx (alias)
`,
`
ALTER TABLE
database_instance
ADD COLUMN is_co_master TINYINT UNSIGNED NOT NULL AFTER replication_depth
`,
`
ALTER TABLE
database_instance_maintenance
ADD KEY active_end_timestamp_idx (maintenance_active, end_timestamp)
`,
`
ALTER TABLE
database_instance
ADD COLUMN sql_delay INT UNSIGNED NOT NULL AFTER slave_lag_seconds
`,
`
ALTER TABLE
topology_recovery
ADD COLUMN analysis varchar(128) CHARACTER SET ascii NOT NULL,
ADD COLUMN cluster_name varchar(128) CHARACTER SET ascii NOT NULL,
ADD COLUMN cluster_alias varchar(128) CHARACTER SET ascii NOT NULL,
ADD COLUMN count_affected_slaves int unsigned NOT NULL,
ADD COLUMN slave_hosts text CHARACTER SET ascii NOT NULL
`,
`
ALTER TABLE hostname_unresolve
ADD COLUMN last_registered TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD KEY last_registered_idx (last_registered)
`,
`
ALTER TABLE topology_recovery
ADD KEY cluster_name_in_active_idx (cluster_name, in_active_period)
`,
`
ALTER TABLE topology_recovery
ADD KEY end_recovery_idx (end_recovery)
`,
`
ALTER TABLE
database_instance
ADD COLUMN binlog_server TINYINT UNSIGNED NOT NULL AFTER version
`,
`
ALTER TABLE cluster_domain_name
ADD COLUMN last_registered TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD KEY last_registered_idx (last_registered)
`,
`
ALTER TABLE
database_instance
ADD COLUMN supports_oracle_gtid TINYINT UNSIGNED NOT NULL AFTER oracle_gtid
`,
`
ALTER TABLE
database_instance
ADD COLUMN executed_gtid_set text CHARACTER SET ascii NOT NULL AFTER oracle_gtid
`,
`
ALTER TABLE
database_instance
ADD COLUMN server_uuid varchar(64) CHARACTER SET ascii NOT NULL AFTER server_id
`,
`
ALTER TABLE
database_instance
ADD COLUMN suggested_cluster_alias varchar(128) CHARACTER SET ascii NOT NULL AFTER cluster_name
`,
`
ALTER TABLE cluster_alias
ADD COLUMN last_registered TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD KEY last_registered_idx (last_registered)
`,
`
ALTER TABLE
topology_recovery
ADD COLUMN is_successful TINYINT UNSIGNED NOT NULL DEFAULT 0 AFTER processcing_node_token
`,
`
ALTER TABLE
topology_recovery
ADD COLUMN acknowledged TINYINT UNSIGNED NOT NULL DEFAULT 0,
ADD COLUMN acknowledged_by varchar(128) CHARACTER SET utf8 NOT NULL,
ADD COLUMN acknowledge_comment text CHARACTER SET utf8 NOT NULL
`,
`
ALTER TABLE
topology_recovery
ADD COLUMN participating_instances text CHARACTER SET ascii NOT NULL after slave_hosts,
ADD COLUMN lost_slaves text CHARACTER SET ascii NOT NULL after participating_instances,
ADD COLUMN all_errors text CHARACTER SET ascii NOT NULL after lost_slaves
`,
}
// Track if a TLS has already been configured for topology
var topologyTLSConfigured bool = false
// Track if a TLS has already been configured for Orchestrator
var orchestratorTLSConfigured bool = false
// OpenTopology returns a DB instance to access a topology instance
func OpenTopology(host string, port int) (*sql.DB, error) {
mysql_uri := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=%ds", config.Config.MySQLTopologyUser, config.Config.MySQLTopologyPassword, host, port, config.Config.MySQLConnectTimeoutSeconds)
if config.Config.MySQLTopologyUseMutualTLS {
mysql_uri, _ = SetupMySQLTopologyTLS(mysql_uri)
}
db, _, err := sqlutils.GetDB(mysql_uri)
db.SetMaxOpenConns(config.Config.MySQLTopologyMaxPoolConnections)
db.SetMaxIdleConns(config.Config.MySQLTopologyMaxPoolConnections)
return db, err
}
// Create a TLS configuration from the config supplied CA, Certificate, and Private key.
// Register the TLS config with the mysql drivers as the "topology" config
// Modify the supplied URI to call the TLS config
// TODO: Way to have password mixed with TLS for various nodes in the topology. Currently everything is TLS or everything is password
func SetupMySQLTopologyTLS(uri string) (string, error) {
if !topologyTLSConfigured {
tlsConfig, err := ssl.NewTLSConfig(config.Config.MySQLTopologySSLCAFile, !config.Config.MySQLTopologySSLSkipVerify)
if err != nil {
return "", log.Fatalf("Can't create TLS configuration for Topology connection %s: %s", uri, err)
}
tlsConfig.InsecureSkipVerify = config.Config.MySQLTopologySSLSkipVerify
if err = ssl.AppendKeyPair(tlsConfig, config.Config.MySQLTopologySSLCertFile, config.Config.MySQLTopologySSLPrivateKeyFile); err != nil {
return "", log.Fatalf("Can't setup TLS key pairs for %s: %s", uri, err)
}
if err = mysql.RegisterTLSConfig("topology", tlsConfig); err != nil {
return "", log.Fatalf("Can't register mysql TLS config for topology: %s", err)
}
topologyTLSConfigured = true
}
return fmt.Sprintf("%s&tls=topology", uri), nil
}
// OpenTopology returns the DB instance for the orchestrator backed database
func OpenOrchestrator() (*sql.DB, error) {
mysql_uri := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?timeout=%ds", config.Config.MySQLOrchestratorUser, config.Config.MySQLOrchestratorPassword,
config.Config.MySQLOrchestratorHost, config.Config.MySQLOrchestratorPort, config.Config.MySQLOrchestratorDatabase, config.Config.MySQLConnectTimeoutSeconds)
if config.Config.MySQLOrchestratorUseMutualTLS {
mysql_uri, _ = SetupMySQLOrchestratorTLS(mysql_uri)
}
db, fromCache, err := sqlutils.GetDB(mysql_uri)
if err == nil && !fromCache {
if !config.Config.SkipOrchestratorDatabaseUpdate {
initOrchestratorDB(db)
}
db.SetMaxIdleConns(10)
}
return db, err
}
// readInternalDeployments reads orchestrator db deployment statements that are known to have been executed
func readInternalDeployments() (baseDeployments []string, patchDeployments []string, err error) {
if !config.Config.SmartOrchestratorDatabaseUpdate {
return baseDeployments, patchDeployments, nil
}
query := fmt.Sprintf(`
select
deployment_type,
sql_statement
from
_orchestrator_db_deployment
order by
deployment_id
`)
db, err := OpenOrchestrator()
if err != nil {
log.Fatalf("Cannot initiate orchestrator internal deployment: %+v", err)
}
err = sqlutils.QueryRowsMap(db, query, func(m sqlutils.RowMap) error {
deploymentType := m.GetString("deployment_type")
sqlStatement := m.GetString("sql_statement")
if deploymentType == "base" {
baseDeployments = append(baseDeployments, sqlStatement)
} else if deploymentType == "patch" {
patchDeployments = append(patchDeployments, sqlStatement)
} else {
log.Fatalf("Unknown deployment type (%+v) encountered in _orchestrator_db_deployment", deploymentType)
}
return nil
})
if err != nil {
log.Debugf("Deploying internal orchestrator tables to fix the above; this is a one time operation")
// Table does not exist? Create it for first time
for _, query := range internalDBDeploymentSQL {
if _, err = execInternal(db, query); err != nil {
log.Fatalf("Cannot initiate orchestrator internal deployment: %+v", err)
}
}
}
return baseDeployments, patchDeployments, nil
}
// writeInternalDeployment will persist a successful deployment
func writeInternalDeployment(db *sql.DB, deploymentType string, sqlStatement string, statementIndex int) error {
if !config.Config.SmartOrchestratorDatabaseUpdate {
return nil
}
query := `
insert ignore into _orchestrator_db_deployment (
deployment_type, sql_statement, statement_digest, statement_index) VALUES (
?, ?, CONCAT(SHA1(?), ':', LEFT(REPLACE(REPLACE(REPLACE(?, ' ', ''), '\n', ' '), '\t', ''), 60)), ?)
`
if _, err := execInternal(db, query, deploymentType, sqlStatement, sqlStatement, sqlStatement, statementIndex); err != nil {
log.Fatalf("Unable to write to _orchestrator_db_deployment: %+v", err)
}
return nil
}
// ResetInternalDeployment will clear existing deployment history (and the effect will be a complete re-deployment
// the next run). This is made available for possible operational errors, a red button
func ResetInternalDeployment() error {
db, err := OpenOrchestrator()
if err != nil {
log.Fatalf("Cannot reset orchestrator internal deployment: %+v", err)
}
if _, err := execInternal(db, `delete from _orchestrator_db_deployment`); err != nil {
log.Fatalf("Unable to clear _orchestrator_db_deployment: %+v", err)
}
return nil
}
// Create a TLS configuration from the config supplied CA, Certificate, and Private key.
// Register the TLS config with the mysql drivers as the "orchestrator" config
// Modify the supplied URI to call the TLS config
func SetupMySQLOrchestratorTLS(uri string) (string, error) {
if !orchestratorTLSConfigured {
tlsConfig, err := ssl.NewTLSConfig(config.Config.MySQLOrchestratorSSLCAFile, true)
if err != nil {
return "", log.Fatalf("Can't create TLS configuration for Orchestrator connection %s: %s", uri, err)
}
tlsConfig.InsecureSkipVerify = config.Config.MySQLOrchestratorSSLSkipVerify
if err = ssl.AppendKeyPair(tlsConfig, config.Config.MySQLOrchestratorSSLCertFile, config.Config.MySQLOrchestratorSSLPrivateKeyFile); err != nil {
return "", log.Fatalf("Can't setup TLS key pairs for %s: %s", uri, err)
}
if err = mysql.RegisterTLSConfig("orchestrator", tlsConfig); err != nil {
return "", log.Fatalf("Can't register mysql TLS config for orchestrator: %s", err)
}
orchestratorTLSConfigured = true
}
return fmt.Sprintf("%s&tls=orchestrator", uri), nil
}
// deployIfNotAlreadyDeployed will issue given sql queries that are not already known to be deployed.
// This iterates both lists (to-run and already-deployed) and also verifies no contraditions.
func deployIfNotAlreadyDeployed(db *sql.DB, queries []string, deployedQueries []string, deploymentType string, fatalOnError bool) error {
for i, query := range queries {
queryAlreadyExecuted := false
// While iterating 'queries', also iterate 'deployedQueries'. Expect identity
if len(deployedQueries) > i {
if deployedQueries[i] != query {
log.Fatalf("initOrchestratorDB() PANIC: non matching %s queries between deployment requests and _orchestrator_db_deployment. Please execute 'orchestrator -c reset-internal-db-deployment'", deploymentType)
}
queryAlreadyExecuted = true
}
if queryAlreadyExecuted {
continue
}
if config.Config.SmartOrchestratorDatabaseUpdate {
log.Debugf("initOrchestratorDB executing: %.80s", strings.TrimSpace(strings.Replace(query, "\n", "", -1)))
}
if fatalOnError {
if _, err := execInternal(db, query); err != nil {
return log.Fatalf("Cannot initiate orchestrator: %+v", err)
}
} else {
execInternalSilently(db, query)
}
writeInternalDeployment(db, deploymentType, query, i)
}
return nil
}
// initOrchestratorDB attempts to create/upgrade the orchestrator backend database. It is created once in the
// application's lifetime.
func initOrchestratorDB(db *sql.DB) error {
log.Debug("Initializing orchestrator")
baseDeployments, patchDeployments, _ := readInternalDeployments()
deployIfNotAlreadyDeployed(db, generateSQLBase, baseDeployments, "base", true)
deployIfNotAlreadyDeployed(db, generateSQLPatches, patchDeployments, "patch", false)
return nil
}
// ExecOrchestrator will execute given query on the orchestrator backend database.
func execInternalSilently(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
res, err := sqlutils.ExecSilently(db, query, args...)
return res, err
}
// ExecOrchestrator will execute given query on the orchestrator backend database.
func execInternal(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
res, err := sqlutils.ExecSilently(db, query, args...)
return res, err
}
// ExecOrchestrator will execute given query on the orchestrator backend database.
func ExecOrchestrator(query string, args ...interface{}) (sql.Result, error) {
db, err := OpenOrchestrator()
if err != nil {
return nil, err
}
res, err := sqlutils.Exec(db, query, args...)
return res, err
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/orchestrator.git
git@gitee.com:mirrors/orchestrator.git
mirrors
orchestrator
orchestrator
v1.4.448

搜索帮助