1 Star 0 Fork 1

KleeScrawl/change_schema

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
table.py 6.22 KB
一键复制 编辑 原始数据 按行查看 历史
KleeScrawl 提交于 2年前 . add tool/table.py.
import re
# encoding=utf8
import sys
from MysqlHelper import MysqlHelper
class MysqlToDoris(object):
def __init__(self, mysql_config, doris_config, regexps):
self.mysql_db = MysqlHelper(mysql_config)
self.doris_db = MysqlHelper(doris_config)
self.regexps = str(regexps).split(",")
def get_col_type(self, COLUMN_TYPE, CHARACTER_MAXIMUM_LENGTH, DATA_TYPE):
if COLUMN_TYPE == 'text' or COLUMN_TYPE == 'longtext' or COLUMN_TYPE == 'timestamp' or COLUMN_TYPE =='tinytext' or COLUMN_TYPE =='json':
COLUMN_TYPE = 'string'
elif DATA_TYPE == 'varchar':
TYPE_LEN = CHARACTER_MAXIMUM_LENGTH * 3
max_len=int(TYPE_LEN) * 3
if max_len>65533:
COLUMN_TYPE = 'string'
else:
COLUMN_TYPE = 'varchar({})'.format(int(TYPE_LEN) * 3)
elif DATA_TYPE.startswith("float"):
COLUMN_TYPE = "float"
elif DATA_TYPE.startswith("varbinary"):
COLUMN_TYPE = 'string'
elif DATA_TYPE.startswith("mediumint"):
COLUMN_TYPE = 'int({})'.format(CHARACTER_MAXIMUM_LENGTH)
elif DATA_TYPE.startswith("timestamp"):
COLUMN_TYPE = 'datatime'
elif DATA_TYPE.startswith("enum"):
COLUMN_TYPE = 'tinyint'
elif DATA_TYPE.startswith("bit"):
COLUMN_TYPE = 'tinyint'
elif DATA_TYPE.startswith("smallint"):
COLUMN_TYPE = 'smallint'
return COLUMN_TYPE
def mysql_doris(self, member_id):
print("select TABLE_SCHEMA,TABLE_NAME from information_schema.tables where TABLE_SCHEMA='{}'".format(member_id))
sync_tables = self.mysql_db.select(
"select TABLE_SCHEMA,TABLE_NAME from information_schema.tables where TABLE_SCHEMA='{}' ".format(member_id))
database_name_dup = set()
for sync_table in sync_tables:
database_name = sync_table[0]
database_name_dup.add(database_name)
for database_name in database_name_dup:
print(database_name)
doris_drop_sql = '''drop database if exists {} '''.format(database_name)
self.doris_db.execute_sql(doris_drop_sql)
crete_sql = '''CREATE DATABASE IF NOT EXISTS {}'''.format(database_name)
print(crete_sql)
self.doris_db.execute_sql(crete_sql)
print("-" * 20)
print("需同步企业库: {} 个".format(len(database_name_dup)))
print("需同步企业表: {} 个".format(len(sync_tables)))
print("-" * 20)
for sync_table in sync_tables:
database_name = sync_table[0]
table_name = sync_table[1]
# print(database_name, table_name)
sql = '''select COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,COLUMN_TYPE,COLUMN_KEY,COLUMN_COMMENT,COLUMN_DEFAULT from information_schema.columns where table_schema ='{}' and table_name ='{}' '''.format(
database_name, table_name)
table_desc = self.mysql_db.select(sql)
# 主键
PRI = []
# 索引
MUL = []
COLUMN_List = []
for col in table_desc:
COLUMN_NAME = str(col[0])
DATA_TYPE = str(col[1])
COLUMN_TYPE = str(col[3]).replace('unsigned', '')
COLUMN_DEFAULT=col[6]
if DATA_TYPE=='bit':COLUMN_DEFAULT=re.findall('[0-9]+',COLUMN_DEFAULT)[0]
if COLUMN_DEFAULT=='CURRENT_TIMESTAMP':COLUMN_DEFAULT='0'
CHARACTER_MAXIMUM_LENGTH = 0
try:
CHARACTER_MAXIMUM_LENGTH = int(col[2])
except:
pass
COLUMN_TYPE=self.get_col_type(COLUMN_TYPE, CHARACTER_MAXIMUM_LENGTH, DATA_TYPE)
COLUMN_KEY = col[4]
if COLUMN_KEY == 'PRI':
PRI.append('''`{}`'''.format(str(COLUMN_NAME)))
elif COLUMN_KEY == 'MUL':
MUL.append(COLUMN_NAME)
if COLUMN_KEY != 'PRI':
AGGR_TYPE = "REPLACE"
if COLUMN_TYPE == 'datetime': COLUMN_DEFAULT = None
if COLUMN_DEFAULT:
COL_ = ''' `{}` {} DEFAULT '{}' '''.format(COLUMN_NAME, COLUMN_TYPE,COLUMN_DEFAULT)
else:
COL_ = ''' `{}` {} '''.format(COLUMN_NAME, COLUMN_TYPE)
COLUMN_List.append(COL_)
pri_keys = ''
head = 'CREATE TABLE IF NOT EXISTS {}.{} ('.format(database_name, table_name)
if len(PRI) > 0:
pri_keys = ','.join(PRI)
for x in range(0, len(PRI)):
index = -1
this_col=""
for co in COLUMN_List:
if co.strip().startswith(PRI[x]):
this_col=co
try:
index = COLUMN_List.index(this_col)
except:
pass
# print(x,PRI[x],index,this_col)
if index > -1:
ele = COLUMN_List.pop(index)
COLUMN_List.insert(x, ele)
cols = ','.join(COLUMN_List)
COL = ''' ) UNIQUE KEY({}) DISTRIBUTED BY HASH({}) BUCKETS 2 PROPERTIES ("replication_num" = "2","light_schema_change"="true") '''.format(
pri_keys, pri_keys)
desc_ = head + cols + COL
try:
# print(desc_)
self.doris_db.execute_sql(desc_)
except Exception as e:
print("-" * 10)
print(desc_)
print("-" * 10)
print("CREATE TABLE TO DORIS FINISH")
if __name__ == '__main__':
cfg=mysql_config
member=sys.argv[1]
mysql_config = {
"host": '',
"port": 3306,
"user": '',
"password": 'z',
"db": '',
"charset": 'utf8'
}
doris_config = {
"host": '',
"port": 9030,
"user": 'root',
"password": '',
"db": 'information_schema',
"charset": 'utf8'
}
v = MysqlToDoris(mysql_config, doris_config, 'nn')
database = 'ttk_member_{}'.format(member)
v.mysql_doris(database)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/KleeScrawl/change_schema.git
git@gitee.com:KleeScrawl/change_schema.git
KleeScrawl
change_schema
change_schema
public

搜索帮助