代码拉取完成,页面将自动刷新
import re
# encoding=utf8
import sys
from MysqlHelper import MysqlHelper
class MysqlToDoris(object):
def __init__(self, mysql_config, doris_config, regexps):
self.mysql_db = MysqlHelper(mysql_config)
self.doris_db = MysqlHelper(doris_config)
self.regexps = str(regexps).split(",")
def get_col_type(self, COLUMN_TYPE, CHARACTER_MAXIMUM_LENGTH, DATA_TYPE):
if COLUMN_TYPE == 'text' or COLUMN_TYPE == 'longtext' or COLUMN_TYPE == 'timestamp' or COLUMN_TYPE =='tinytext' or COLUMN_TYPE =='json':
COLUMN_TYPE = 'string'
elif DATA_TYPE == 'varchar':
TYPE_LEN = CHARACTER_MAXIMUM_LENGTH * 3
max_len=int(TYPE_LEN) * 3
if max_len>65533:
COLUMN_TYPE = 'string'
else:
COLUMN_TYPE = 'varchar({})'.format(int(TYPE_LEN) * 3)
elif DATA_TYPE.startswith("float"):
COLUMN_TYPE = "float"
elif DATA_TYPE.startswith("varbinary"):
COLUMN_TYPE = 'string'
elif DATA_TYPE.startswith("mediumint"):
COLUMN_TYPE = 'int({})'.format(CHARACTER_MAXIMUM_LENGTH)
elif DATA_TYPE.startswith("timestamp"):
COLUMN_TYPE = 'datatime'
elif DATA_TYPE.startswith("enum"):
COLUMN_TYPE = 'tinyint'
elif DATA_TYPE.startswith("bit"):
COLUMN_TYPE = 'tinyint'
elif DATA_TYPE.startswith("smallint"):
COLUMN_TYPE = 'smallint'
return COLUMN_TYPE
def mysql_doris(self, member_id):
print("select TABLE_SCHEMA,TABLE_NAME from information_schema.tables where TABLE_SCHEMA='{}'".format(member_id))
sync_tables = self.mysql_db.select(
"select TABLE_SCHEMA,TABLE_NAME from information_schema.tables where TABLE_SCHEMA='{}' ".format(member_id))
database_name_dup = set()
for sync_table in sync_tables:
database_name = sync_table[0]
database_name_dup.add(database_name)
for database_name in database_name_dup:
print(database_name)
doris_drop_sql = '''drop database if exists {} '''.format(database_name)
self.doris_db.execute_sql(doris_drop_sql)
crete_sql = '''CREATE DATABASE IF NOT EXISTS {}'''.format(database_name)
print(crete_sql)
self.doris_db.execute_sql(crete_sql)
print("-" * 20)
print("需同步企业库: {} 个".format(len(database_name_dup)))
print("需同步企业表: {} 个".format(len(sync_tables)))
print("-" * 20)
for sync_table in sync_tables:
database_name = sync_table[0]
table_name = sync_table[1]
# print(database_name, table_name)
sql = '''select COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,COLUMN_TYPE,COLUMN_KEY,COLUMN_COMMENT,COLUMN_DEFAULT from information_schema.columns where table_schema ='{}' and table_name ='{}' '''.format(
database_name, table_name)
table_desc = self.mysql_db.select(sql)
# 主键
PRI = []
# 索引
MUL = []
COLUMN_List = []
for col in table_desc:
COLUMN_NAME = str(col[0])
DATA_TYPE = str(col[1])
COLUMN_TYPE = str(col[3]).replace('unsigned', '')
COLUMN_DEFAULT=col[6]
if DATA_TYPE=='bit':COLUMN_DEFAULT=re.findall('[0-9]+',COLUMN_DEFAULT)[0]
if COLUMN_DEFAULT=='CURRENT_TIMESTAMP':COLUMN_DEFAULT='0'
CHARACTER_MAXIMUM_LENGTH = 0
try:
CHARACTER_MAXIMUM_LENGTH = int(col[2])
except:
pass
COLUMN_TYPE=self.get_col_type(COLUMN_TYPE, CHARACTER_MAXIMUM_LENGTH, DATA_TYPE)
COLUMN_KEY = col[4]
if COLUMN_KEY == 'PRI':
PRI.append('''`{}`'''.format(str(COLUMN_NAME)))
elif COLUMN_KEY == 'MUL':
MUL.append(COLUMN_NAME)
if COLUMN_KEY != 'PRI':
AGGR_TYPE = "REPLACE"
if COLUMN_TYPE == 'datetime': COLUMN_DEFAULT = None
if COLUMN_DEFAULT:
COL_ = ''' `{}` {} DEFAULT '{}' '''.format(COLUMN_NAME, COLUMN_TYPE,COLUMN_DEFAULT)
else:
COL_ = ''' `{}` {} '''.format(COLUMN_NAME, COLUMN_TYPE)
COLUMN_List.append(COL_)
pri_keys = ''
head = 'CREATE TABLE IF NOT EXISTS {}.{} ('.format(database_name, table_name)
if len(PRI) > 0:
pri_keys = ','.join(PRI)
for x in range(0, len(PRI)):
index = -1
this_col=""
for co in COLUMN_List:
if co.strip().startswith(PRI[x]):
this_col=co
try:
index = COLUMN_List.index(this_col)
except:
pass
# print(x,PRI[x],index,this_col)
if index > -1:
ele = COLUMN_List.pop(index)
COLUMN_List.insert(x, ele)
cols = ','.join(COLUMN_List)
COL = ''' ) UNIQUE KEY({}) DISTRIBUTED BY HASH({}) BUCKETS 2 PROPERTIES ("replication_num" = "2","light_schema_change"="true") '''.format(
pri_keys, pri_keys)
desc_ = head + cols + COL
try:
# print(desc_)
self.doris_db.execute_sql(desc_)
except Exception as e:
print("-" * 10)
print(desc_)
print("-" * 10)
print("CREATE TABLE TO DORIS FINISH")
if __name__ == '__main__':
cfg=mysql_config
member=sys.argv[1]
mysql_config = {
"host": '',
"port": 3306,
"user": '',
"password": 'z',
"db": '',
"charset": 'utf8'
}
doris_config = {
"host": '',
"port": 9030,
"user": 'root',
"password": '',
"db": 'information_schema',
"charset": 'utf8'
}
v = MysqlToDoris(mysql_config, doris_config, 'nn')
database = 'ttk_member_{}'.format(member)
v.mysql_doris(database)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。