2 Star 0 Fork 0

tf / ZjdxCKDataHandelPyThon

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main7.py 25.17 KB
一键复制 编辑 原始数据 按行查看 历史
Muzi 提交于 2021-04-01 18:29 . 第七步 修改email获取
import asyncio
import time
from aiochclient import ChClient
from aiohttp import ClientSession
import lcs
tempTableDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp"
tempTableCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp ("
" `uuid` String,"
" `guid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY guid"
" SETTINGS index_granularity = 8192"
)
ExpertNewTmpDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp"
ExpertNewTmpCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp ("
" `uuid` String,"
" `guid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
ExpertsRelationshipTmpDelete = "DROP TABLE IF EXISTS datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp"
ExpertsRelationshipTmpCreate = ("CREATE TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp ("
" `uuid` String,"
" `guid` String"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
async def main():
async with ClientSession() as s:
client = ChClient(s, url='http://115.29.55.141:8123/', user='zxcs', password='123123', database='datahouse')
alive = await client.is_alive() # returns True if connection is Ok
# 测试,清空专家表和专家关系表
# await client.execute("ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew DELETE WHERE 1=1")
# await client.execute("ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship DELETE WHERE 1=1")
# 如果字段不存在则添加字段
await client.execute(
"alter table datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew add column if not exists keywords Nullable(String)"
)
await client.execute(
"alter table datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew add column if not exists subjects Nullable(String)"
)
await client.execute(
"alter table datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew add column if not exists journals Nullable(String)"
)
await client.execute(
"alter table datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew add column if not exists source_email Nullable(String)"
)
# 创建数据临时表
await client.execute(tempTableDelete)
await client.execute(tempTableCreate)
# 创建专家关系临时表
await client.execute(ExpertsRelationshipTmpDelete)
await client.execute(ExpertsRelationshipTmpCreate)
# 创建临时专家表
await client.execute(ExpertNewTmpDelete)
await client.execute(ExpertNewTmpCreate)
sql0 = (
"SELECT email from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_1 where email is not null and email != '' group by email order by LENGTH(email) desc")
sql0_rows = await client.fetch(sql0)
for item in sql0_rows:
sourceEmail = item['email']
# 每次循环清空临时表
await client.execute("ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE WHERE 1=1")
await client.execute(
"ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp DELETE WHERE 1=1")
# step 1
sql1 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp" +
" select " +
" generateUUIDv4() as randomUUID,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_1 "
" where guid in (select guid from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_1 where email='" + sourceEmail + "')"
)
await client.execute(sql1)
# print(f"sql1 -> {sql1}")
# step 2 分组
sql2 = ("select `name`,organization from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp"
" GROUP by `name` ,organization"
" ORDER BY LENGTH(`name`) DESC"
)
sql2_rows = await client.fetch(sql2)
# print(f"sql2 -> {sql2}")
sql2_name_list = []
sql2_organization_list = []
for row in sql2_rows:
sql2_name_list.append(row["name"])
sql2_organization_list.append(row["organization"])
# step3
# 遍历 分组数据,插入临时表
for i in range(len(sql2_name_list)):
# 每次循环清空临时专家表
await client.execute(
"ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp DELETE WHERE 1=1")
name = sql2_name_list[i]
organization = sql2_organization_list[i]
print(name, ' ', organization)
# name and organization 相同insert 临时表
sql3 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp " +
" select " +
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp " +
" where name = '" + name + "' and organization = '" + organization + "';"
)
# print(f"sql3 -> {sql3}")
await client.execute(sql3)
# 移除插入临时表数据
# time.sleep(2) # 141测试环境数据库性能差,休眠2秒
sql4 = "ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE where `name` = '" + name + "' and organization = '" + organization + "';"
# print(f"sql4 -> {sql4}")
await client.execute(sql4)
# 根据机构相同遍历
sql5 = "select uuid,`name` from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp where organization = '" + organization + "'"
sql5_uuid_list = []
# print(f"sql5 -> {sql5}")
sql5_rows = await client.fetch(sql5)
# 姓名是子串加入临时表
for row in sql5_rows:
_uuid = row["uuid"]
_name = row["name"]
# 如果姓名是子串,插入临时表,并且删除当前数据
if lcs.isLCS(name, _name):
sql5_1 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp " +
" select "
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp " +
" where uuid = '" + _uuid + "'"
)
sql5_uuid_list.append(_uuid)
await client.execute(sql5_1)
# 移除移入name子串数据
for uid in sql5_uuid_list:
sql5_2 = "ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE where uuid = '" + uid + "'"
await client.execute(sql5_2)
# 根据姓名相同遍历
sql6 = " select uuid,organization from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp where name ='" + name + "'"
sql6_uuid_list = []
sql6_rows = await client.fetch(sql6)
# 机构如是子串加入临时表
for row in sql6_rows:
_uuid = row["uuid"]
_organization = row["organization"]
# 如果机构是子串,插入临时表,并且删除当前数据
if lcs.isLCS(organization, _organization):
sql6_1 = (
" insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp " +
" select " +
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp " +
" where uuid = '" + _uuid + "'"
)
sql6_uuid_list.append(_uuid)
await client.execute(sql6_1)
# 移除移入机构子串数据
for uid in sql6_uuid_list:
sql6_2 = "ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Tmp DELETE where uuid ='" + uid + "'"
await client.execute(sql6_2)
# 操作专家临时表 ,数据合并
# 遍历临时专家表
expertId = await client.fetchval("SELECT generateUUIDv4() as randomUUID")
sql7_rows_sorganization = {}
sql7_rows_email = {}
sql7_rows_reprintauthor = {}
sql7_rows_altname = {}
sql7_rows_country = {}
sql7_rows_firstauthor = {}
sql7_rows_organizationdept = {}
sql7 = (
" select " +
" `uuid`,`guid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew_Tmp "
)
sql7_rows = await client.fetch(sql7)
# 插入关系表
values = []
for row in sql7_rows:
_guid = row["guid"]
_uuid = str(expertId)
value = "('" + _uuid + "','" + _guid + "')"
values.append(value)
second_organization = row["Second_organization"]
if second_organization is not None:
if sql7_rows_sorganization.get(second_organization) is None:
sql7_rows_sorganization[second_organization] = 1
else:
sql7_rows_sorganization[second_organization] = sql7_rows_sorganization[
second_organization] + 1
email = row["email"]
if email is not None:
if sql7_rows_email.get(email) is None:
sql7_rows_email[email] = 1
else:
sql7_rows_email[email] = sql7_rows_email[email] + 1
reprintauthor = row["reprintauthor"]
if reprintauthor is not None:
if sql7_rows_reprintauthor.get(reprintauthor) is None:
sql7_rows_reprintauthor[reprintauthor] = 1
else:
sql7_rows_reprintauthor[reprintauthor] = sql7_rows_reprintauthor[reprintauthor] + 1
altname = row["altname"]
if altname is not None:
if sql7_rows_altname.get(altname) is None:
sql7_rows_altname[altname] = 1
else:
sql7_rows_altname[altname] = sql7_rows_altname[altname] + 1
country = row["country"]
if country is not None:
if sql7_rows_country.get(country) is None:
sql7_rows_country[country] = 1
else:
sql7_rows_country[country] = sql7_rows_country[country] + 1
firstauthor = row["firstauthor"]
if firstauthor is not None:
if sql7_rows_firstauthor.get(firstauthor) is None:
sql7_rows_firstauthor[firstauthor] = 1
else:
sql7_rows_firstauthor[firstauthor] = sql7_rows_firstauthor[firstauthor] + 1
organizationdept = row["organizationdept"]
if organizationdept is not None:
if sql7_rows_organizationdept.get(organizationdept) is None:
sql7_rows_organizationdept[organizationdept] = 1
else:
sql7_rows_organizationdept[organizationdept] = sql7_rows_organizationdept[
organizationdept] + 1
await client.execute(
"INSERT INTO datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp VALUES" + ','.join(
values)
)
print(len(sql7_rows))
# 合并专家表
# email 排序处理
if len(sql7_rows) > 0:
sorted(sql7_rows_email.items(), key=lambda item: item[1], reverse=True)
emailnew = ""
for e in sql7_rows_email:
if emailnew != "":
# 拆邮箱
if sql7_rows_email[emailnew] == sql7_rows_email[e]:
emailHead = "" if e is None else e[0:e.find('@')]
emailOrg = "" if e is None else e[e.find('@') + 1:e.find('.')]
if lcs.isLCS(name, emailHead) and lcs.isLCS(organization, emailOrg):
emailnew = e
else:
emailnew = e
await client.execute("INSERT INTO datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew VALUES",
(expertId, name, organization, ",".join(sql7_rows_sorganization.keys()),
emailnew,
",".join(sql7_rows_reprintauthor.keys()), ",".join(sql7_rows_altname.keys()),
",".join(sql7_rows_country.keys()), ",".join(sql7_rows_firstauthor.keys()),
",".join(sql7_rows_organizationdept.keys()), '', '', '', sourceEmail)
)
print('step 1~4 finish')
# step5
sql8 = (
" SELECT email from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew " +
" where email is not null and email !='' " +
" group by email " +
" HAVING count(*) > 1 "
)
sql8_rows = await client.fetch(sql8)
if sql8_rows is not None and len(sql8_rows) > 0:
for sql8_rows_item in sql8_rows:
if sql8_rows_item != '':
sql8_1 = (
" SELECT " +
" `uuid`,`name`,`organization`,`Second_organization`,`email`,`reprintauthor`,`altname`,`country`,`firstauthor`,`organizationdept`,`keywords`,`subjects`,`journals` " +
" from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew " +
" where email ='" + str(sql8_rows_item['email']) + "' ORDER BY LENGTH(name) DESC"
)
sql8_1_rows = await client.fetch(sql8_1)
if sql8_1_rows is not None and len(sql8_1_rows) > 0:
sameArr = []
for j in range(len(sql8_1_rows)):
itemj = sql8_1_rows[j]
namej = itemj['name']
for k in range(len(sql8_1_rows)):
if k > j:
itemk = sql8_1_rows[k]
namek = itemk['name']
if namej == namek or lcs.isLCS(namej, namek):
if itemj not in sameArr:
sameArr.append(itemj)
if itemk not in sameArr:
sameArr.append(itemk)
if len(sameArr) > 0:
randomUUID = await client.fetchval("SELECT generateUUIDv4() as randomUUID")
uuid = str(randomUUID)
uuidArr = []
newOrganization = ''
for j in range(len(sameArr)):
sameItem = sameArr[j]
newOrganization += sameItem['organization']
if j < len(sameArr) - 1:
newOrganization += ','
itemUuid = str(sameItem['uuid'])
uuidArr.append("'" + itemUuid + "'")
# 删除专家
sql8_2 = (
" ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew " +
" DELETE WHERE uuid in (" + ','.join(uuidArr) + ")"
)
# print(f"sql8_2 -> {sql8_2}")
await client.execute(sql8_2)
# 新增新专家
sql8_3 = (
"insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew values(" +
"'" + uuid + "'," +
"'" + str(sameArr[0]['name'] or '') + "'," +
"'" + str(newOrganization) + "'," +
"'" + str(sameArr[0]['Second_organization'] or '') + "'," +
"'" + str(sameArr[0]['email'] or '') + "'," +
"'" + str(sameArr[0]['reprintauthor'] or '') + "'," +
"'" + str(sameArr[0]['altname'] or '') + "'," +
"'" + str(sameArr[0]['country'] or '') + "'," +
"'" + str(sameArr[0]['firstauthor'] or '') + "'," +
"'" + str(sameArr[0]['organizationdept'] or '') + "'," +
"''," +
"''," +
"''," +
"'" + sourceEmail + "'" +
")"
)
# print(f"sql8_3 -> {sql8_3}")
await client.execute(sql8_3)
# 查询专家关系
sql8_4 = (
" SELECT " +
" `uuid`,`guid`" +
" FROM datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp " +
" WHERE uuid in (" + ','.join(uuidArr) + ")"
)
sql8_4_rows = await client.fetch(sql8_4)
if sql8_4_rows is not None:
# 删除旧专家关系
sql8_5 = (
" ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp " +
" DELETE WHERE uuid in (" + ','.join(uuidArr) + ")"
)
await client.execute(sql8_5)
sql8_6 = (
"insert into datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp values"
)
for j in range(len(sql8_4_rows)):
item = sql8_4_rows[j]
guid = str(item['guid'])
sql8_6_item = "('" + uuid + "','" + guid + "')"
if j < len(sql8_4_rows) - 1:
sql8_6_item += ','
sql8_6 += sql8_6_item
await client.execute(sql8_6)
print('step 5 finish')
# step6
sql9_1 = ("SELECT uuid FROM datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew")
# print(f"sql9_1 -> {sql9_1}")
sql9_1_rows = await client.fetch(sql9_1)
for sql9_1_row in sql9_1_rows:
_uuid = sql9_1_row["uuid"]
sql9_2 = ("SELECT guid,DE,SC,SO FROM datahouse.T_SCI_WOS_2020_ISSN_China WHERE guid in ("
"SELECT guid FROM datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp "
"WHERE uuid = '" + _uuid + "')")
sql9_2_rows = await client.fetch(sql9_2)
keywords = {}
subjects = {}
journals = {}
for sql9_2_row in sql9_2_rows:
de = sql9_2_row["DE"]
if de is not None and de != '':
if keywords.get(de) is None:
keywords[de] = 1
else:
keywords[de] = keywords[de] + 1
sc = sql9_2_row["SC"]
if sc is not None and sc != '':
if subjects.get(sc) is None:
subjects[sc] = 1
else:
subjects[sc] = subjects[sc] + 1
so = sql9_2_row["SO"]
if so is not None and so != '':
if journals.get(so) is None:
journals[so] = 1
else:
journals[so] = journals[so] + 1
sql9_2 = (
" ALTER TABLE datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_ExpertNew " +
" UPDATE " +
" keywords='" + ",".join(keywords.keys()) + "' ," +
" subjects = '" + ",".join(subjects.keys()) + "' ," +
" journals='" + ",".join(journals.keys()) + "' " +
" where uuid = '" + _uuid + "' "
)
# print(f"sql9_2 -> {sql9_2}")
await client.execute(sql9_2)
# 临时专家关系去重,插入专家关系表
sql10 = (
" INSERT INTO datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship "
" SELECT uuid,guid from datahouse.T_SCI_WOS_2020_ISSN_China_OutPut_Experts_Relationship_Tmp group by uuid,guid "
)
await client.execute(sql10)
time.sleep(2)
print('step 6 finish')
# 删除临时表
await client.execute(ExpertNewTmpDelete)
await client.execute(tempTableDelete)
await client.execute(ExpertsRelationshipTmpDelete)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/tf001/ZjdxCKDataHandelPyThon.git
git@gitee.com:tf001/ZjdxCKDataHandelPyThon.git
tf001
ZjdxCKDataHandelPyThon
ZjdxCKDataHandelPyThon
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891