2 Star 0 Fork 0

tf/ZjdxCKDataHandelPyThon

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main1-4.py 14.37 KB
一键复制 编辑 原始数据 按行查看 历史
Muzi 提交于 2021-03-31 17:24 . 作者表新加字段提取
import asyncio
from aiochclient import ChClient
from aiohttp import ClientSession
import lcs
import time
import uuid as UUID
tempTableDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp"
tempTableCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp ("
" `uuid` String,"
" `guid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY guid"
" SETTINGS index_granularity = 8192"
)
RelationDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship"
RelationCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship ("
" `uuid` String,"
" `guid` String"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
ExpertNewTmpDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp;"
ExpertNewTmpCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp ("
" `uuid` String,"
" `guid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
# 正式专家表
ExpertNewDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew;"
ExpertNewCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew ("
" `uuid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
async def main():
async with ClientSession() as s:
client = ChClient(s, url='http://115.29.55.141:8123/', user='zxcs', password='123123', database='datahouse')
alive = await client.is_alive() # returns True if connection is Ok
# print(f"Is ClickHouse alive? -> {alive}")
# step 1
# 创建数据临时表
await client.execute(tempTableDelete)
await client.execute(tempTableCreate)
sql1 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp" +
" select " +
" generateUUIDv4() as randomUUID,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut "
"where guid in (select guid from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut where email='shibojiang@fudan.edu.cn')"
)
await client.execute(sql1)
print(f"sql1 -> {sql1}")
# step 2 分组
sql2 = ("select name,organization from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp"
" GROUP by name ,organization"
" ORDER BY LENGTH(name) DESC"
)
sql2_rows = await client.fetch(sql2)
# print(f"sql2 -> {sql2}")
sql2_name_list = []
sql2_organization_list = []
for row in sql2_rows:
sql2_name_list.append(row["name"])
sql2_organization_list.append(row["organization"])
# step3 if 条件1 name organization 完全一致
# 创建正式专家表
await client.execute(ExpertNewDelete)
await client.execute(ExpertNewCreate)
# 创建正式关联表
await client.execute(RelationDelete)
await client.execute(RelationCreate)
# 遍历 分组数据,插入临时表
for i in range(len(sql2_name_list)):
name = sql2_name_list[i]
organization = sql2_organization_list[i]
print(f"name -> {name}")
print(f"organization -> {organization}")
# 创建临时专家表
await client.execute(ExpertNewTmpDelete)
await client.execute(ExpertNewTmpCreate)
# name and organization 相同insert 临时表
sql3 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp " +
" select " +
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp " +
" where name = '" + name + "' and organization = '" + organization + "';"
)
# print(f"sql3 -> {sql3}")
await client.execute(sql3)
# 移除插入临时表数据
# time.sleep(1)#测试环境数据库性能差,休眠1秒
sql4 = "ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE where name = '" + name + "' and organization = '" + organization + "';"
# print(f"sql4 -> {sql4}")
await client.execute(sql4)
# 根据机构相同遍历
sql5 = "select uuid,name from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp where organization = '" + organization + "'"
sql5_uuid_list = []
# print(f"sql5 -> {sql5}")
sql5_rows = await client.fetch(sql5)
# 姓名是子串加入临时表
for row in sql5_rows:
_uuid = row["uuid"]
_name = row["name"]
# 如果姓名是子串,插入临时表,并且删除当前数据
if lcs.isLCS(name, _name):
sql5_1 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp " +
" select "
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp " +
" where uuid = '" + _uuid + "'"
)
sql5_uuid_list.append(_uuid)
await client.execute(sql5_1)
# 移除移入name子串数据
for uid in sql5_uuid_list:
sql5_2 = "ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE where uuid = '" + uid + "'"
await client.execute(sql5_2)
# 根据姓名相同遍历
sql6 = " select uuid,organization from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp where name ='" + name + "'"
sql6_uuid_list = []
sql6_rows = await client.fetch(sql6)
# 机构如是子串加入临时表
for row in sql6_rows:
_uuid = row["uuid"]
_organization = row["organization"]
# 如果机构是子串,插入临时表,并且删除当前数据
if lcs.isLCS(organization, _organization):
sql6_1 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp " +
" select " +
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp " +
" where uuid = '" + _uuid + "'"
)
sql6_uuid_list.append(_uuid)
await client.execute(sql6_1)
# 移除移入机构子串数据
for uid in sql6_uuid_list:
sql6_2 = "ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE where uuid ='" + uid + "'"
await client.execute(sql6_2)
# 操作专家临时表 ,数据合并
# 遍历临时专家表
expertIdsql = "SELECT generateUUIDv4() as randomUUID"
expertId = await client.fetchval(expertIdsql)
sql7_rows_sorganization = {}
sql7_rows_email = {}
sql7_rows_reprintauthor = {}
sql7_rows_altname = {}
sql7_rows_country = {}
sql7_rows_firstauthor = {}
sql7_rows_organizationdept = {}
sql7 = (
" select " +
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp "
)
sql7_rows = await client.fetch(sql7)
# 插入关系表
for row in sql7_rows:
_guid = row["guid"]
await client.execute(
"INSERT INTO datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship VALUES",
(expertId, _guid)
)
second_organization = row["Second_organization"]
if second_organization is not None:
if sql7_rows_sorganization.get(second_organization) is None:
sql7_rows_sorganization[second_organization] = 1
else:
sql7_rows_sorganization[second_organization] = sql7_rows_sorganization[second_organization] + 1
email = row["email"]
if email is not None:
if sql7_rows_email.get(email) is None:
sql7_rows_email[email] = 1
else:
sql7_rows_email[email] = sql7_rows_email[email] + 1
reprintauthor = row["reprintauthor"]
if reprintauthor is not None:
if sql7_rows_reprintauthor.get(reprintauthor) is None:
sql7_rows_reprintauthor[reprintauthor] = 1
else:
sql7_rows_reprintauthor[reprintauthor] = sql7_rows_reprintauthor[reprintauthor] + 1
altname = row["altname"]
if altname is not None:
if sql7_rows_altname.get(altname) is None:
sql7_rows_altname[altname] = 1
else:
sql7_rows_altname[altname] = sql7_rows_altname[altname] + 1
country = row["country"]
if country is not None:
if sql7_rows_country.get(country) is None:
sql7_rows_country[country] = 1
else:
sql7_rows_country[country] = sql7_rows_country[country] + 1
firstauthor = row["firstauthor"]
if firstauthor is not None:
if sql7_rows_firstauthor.get(firstauthor) is None:
sql7_rows_firstauthor[firstauthor] = 1
else:
sql7_rows_firstauthor[firstauthor] = sql7_rows_firstauthor[firstauthor] + 1
organizationdept = row["organizationdept"]
if organizationdept is not None:
if sql7_rows_organizationdept.get(organizationdept) is None:
sql7_rows_organizationdept[organizationdept] = 1
else:
sql7_rows_organizationdept[organizationdept] = sql7_rows_organizationdept[organizationdept] + 1
print(len(sql7_rows))
# 合并专家表
# email 排序处理
if len(sql7_rows) > 0:
sorted(sql7_rows_email.items(), key=lambda item: item[1], reverse=True)
emailnew = ""
for e in sql7_rows_email:
if emailnew != "":
# 拆邮箱
if sql7_rows_email[emailnew] == sql7_rows_email[e]:
emailHead = "" if e is None else e[0:e.find('@')]
emailOrg = "" if e is None else e[e.find('@') + 1:e.find('.')]
if lcs.isLCS(name, emailHead) and lcs.isLCS(organization, emailOrg):
emailnew = e
else:
emailnew = e
await client.execute("INSERT INTO datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew VALUES",
(expertId, name, organization, ",".join(sql7_rows_sorganization.keys()), emailnew,
",".join(sql7_rows_reprintauthor.keys()), ",".join(sql7_rows_altname.keys()),
",".join(sql7_rows_country.keys()), ",".join(sql7_rows_firstauthor.keys()),
",".join(sql7_rows_organizationdept.keys()))
)
# 删除临时表
await client.execute(ExpertNewTmpDelete)
await client.execute(tempTableDelete)
print('step 1~4 finish')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/tf001/ZjdxCKDataHandelPyThon.git
git@gitee.com:tf001/ZjdxCKDataHandelPyThon.git
tf001
ZjdxCKDataHandelPyThon
ZjdxCKDataHandelPyThon
master

搜索帮助

D67c1975 1850385 1daf7b77 1850385