2 Star 0 Fork 0

tf / ZjdxCKDataHandelPyThon

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
step2_8_org1.py 20.15 KB
一键复制 编辑 原始数据 按行查看 历史
tf 提交于 2021-10-11 11:44 . 更新代码
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
from clickhouse_driver import Client
import pandas as pd
import threading
import time
import constant
import lcs
import re
import pinyin as py
import validation
def createTable(tableName, columns):
return "CREATE TABLE " + tableName + columns
def deleteTable(tableName):
return "DROP TABLE IF EXISTS " + tableName
ExpertColumns = (
" ("
" `uuid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String),"
" `keywords` Nullable(String),"
" `subjects` Nullable(String),"
" `journals` Nullable(String),"
" `source_email` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
RelationshipColumns = (
" ("
" `uuid` String,"
" `guid` String"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
RepeatOrgColumns = (
" ("
" `organization` String,"
" `num` Int32"
" )"
" ENGINE = MergeTree"
" ORDER BY organization"
" SETTINGS index_granularity = 8192"
)
RepeatOrgUuidColumns = (
" ("
" `uuid` String,"
" `ruuid` String"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
ExpertQueryColumns = (
" uuid,name,organization,Second_organization,email,reprintauthor,altname,country,firstauthor,organizationdept,"
" keywords,subjects,journals,source_email "
)
China = constant.China
OutPut = constant.OutPut
Expert = OutPut + "_Expert"
ExpertTmp = Expert + "_Tmp"
ExpertTmp1 = ExpertTmp + "1"
ExpertTmp1Delete = deleteTable(ExpertTmp1)
ExpertTmp1Create = createTable(ExpertTmp1, ExpertColumns)
Relationship = OutPut + "_Experts_Relationship"
RelationshipTmp = Relationship + "_Tmp"
RelationshipTmp1 = RelationshipTmp + "1"
RelationshipTmp1Delete = deleteTable(RelationshipTmp1)
RelationshipTmp1Create = createTable(RelationshipTmp1, RelationshipColumns)
RepeatOrg = Expert + "_Repeat_Org"
RepeatOrgDelete = deleteTable(RepeatOrg)
RepeatOrgCreate = createTable(RepeatOrg, RepeatOrgColumns)
RepeatOrgUuid = RepeatOrg + "_Uuid"
RepeatOrgUuidDelete = deleteTable(RepeatOrgUuid)
RepeatOrgUuidCreate = createTable(RepeatOrgUuid, RepeatOrgUuidColumns)
RepeatOrgExecute = Expert + "_Repeat_Org_Execute"
RepeatOrgExecuteDelete = deleteTable(RepeatOrgExecute)
RepeatOrgExecuteCreate = createTable(RepeatOrgExecute, RepeatOrgColumns)
def heartbeatQuery(client):
"""
心跳查询,防止程序执行时间过程,数据库链接断开
"""
client.execute("select 1")
def escapeCharacter(str):
"""
数据库sql字符串转义
\n ' 转义为 ''
\n { 转义为 {{
\n } 转义为 }}
:param str:
:return:
"""
return str.replace("'", "''").replace("{", "{{").replace("}", "}}")
def getDbStr(strParam):
strParam = str(strParam)
max = len(strParam)
i = 0
while True:
strParam = strParam.strip()
if strParam.startswith("'"):
strParam = strParam[1:]
if strParam.endswith("'"):
strParam = strParam[:len(strParam) - 1]
if strParam.endswith("\\"):
strParam = strParam[:len(strParam) - 1]
if not strParam.startswith("'") and not strParam.endswith("'") and not strParam.endswith("\\"):
# 收尾的单引号都删除后,返回
break
i += 1
if i > max:
# 防止死循环
break
return "'" + strParam + "'"
def arrUnique(arr):
return list(pd.unique(arr))
def strUniqueAndsplitBySemicolon(str):
"""
去重,分号隔开
"""
arr = []
if str is not None and len(str.strip()) > 0:
arr1 = str.split(";")
for item1 in arr1:
if len(item1.strip()) > 0:
arr2 = item1.strip().split(",")
for item2 in arr2:
if len(item2.strip()) > 0:
arr.append(item2.strip())
# 去重,分号隔开
singleArr = arrUnique(arr)
return ';'.join(singleArr)
else:
return ''
def noneToEmptyStr(value):
res = ''
if value is not None and value != 'None':
res = value
return res
def noneToEmptyArr(arr):
res = []
if len(arr) > 0:
for i in range(len(arr)):
item = arr[i]
if item is not None and item != 'None':
res.append(str(item))
else:
res.append('')
return res
def getUuidMap(uuidArr, uuidMap, uuidKey):
newUuidArr = []
newUuidArr.extend(uuidArr)
for uuid in uuidArr:
if uuid in uuidMap.keys():
delUuidArr = uuidMap[uuid]
newUuidArr.extend(delUuidArr)
del uuidMap[uuid]
uuidMap[uuidKey] = arrUnique(newUuidArr)
return uuidMap
def insertRUuid(client, uuidMap):
for key in uuidMap.keys():
uuidArr = uuidMap[key]
sql = (
" insert into " + RepeatOrgUuid + "(uuid,ruuid)VALUES"
)
insertNum = 0
for i in range(len(uuidArr)):
insertNum += 1
itemRUuid = uuidArr[i]
sql += "(" + getDbStr(escapeCharacter(str(key))) + "," + getDbStr(escapeCharacter(str(itemRUuid))) + "),"
if insertNum >= 1500:
if (sql.endswith(',')):
sql = sql[0:len(sql) - 1]
if (sql.endswith(')')):
insertNum = 0
client.execute(sql)
sql = (
" insert into " + RepeatOrgUuid + "(uuid,ruuid)VALUES"
)
if insertNum > 0:
if (sql.endswith(',')):
sql = sql[0:len(sql) - 1]
if (sql.endswith(')')):
client.execute(sql)
def getGuidMapOrg(client, uuidMap):
guidMap = {}
if len(uuidMap.keys()) > 0:
insertRUuid(client, uuidMap)
for key in uuidMap.keys():
sql = (
" select guid from " + RelationshipTmp +
" where uuid in (select ruuid from " + RepeatOrgUuid +
" where uuid = " + getDbStr(escapeCharacter(key)) + ")"
)
rows = client.execute(sql)
if rows is not None and len(rows) > 0:
guidArr = []
for rowsItem in rows:
guidArr.append(str(rowsItem[0]))
guidArr = arrUnique(guidArr)
guidMap[key] = guidArr
return guidMap
def mergeRepeatExpert(expert1, expert2):
"""
expert2 合入 expert1
"""
organizationArr = arrUnique([noneToEmptyStr(expert1[2]), noneToEmptyStr(expert2[2])])
secondOrganizationArr = arrUnique([noneToEmptyStr(expert1[3]), noneToEmptyStr(expert2[3])])
# emailArr = [noneToEmptyStr(expert1[4]), noneToEmptyStr(expert2[4])]
organizationdeptArr = arrUnique([noneToEmptyStr(expert1[9]), noneToEmptyStr(expert2[9])])
keywordsArr = [noneToEmptyStr(expert1[10]), noneToEmptyStr(expert2[10])]
subjectsArr = [noneToEmptyStr(expert1[11]), noneToEmptyStr(expert2[11])]
journalsArr = [noneToEmptyStr(expert1[12]), noneToEmptyStr(expert2[12])]
organization = strUniqueAndsplitBySemicolon(";".join(organizationArr))
secondOrganization = strUniqueAndsplitBySemicolon(";".join(secondOrganizationArr))
# email = strUniqueAndsplitBySemicolon(";".join(emailArr))
email = validation.checkAndGetSingleEmail(noneToEmptyStr(expert1[4]), noneToEmptyStr(expert2[4]),
noneToEmptyStr(expert1[1]), organization)
organizationdept = strUniqueAndsplitBySemicolon(";".join(organizationdeptArr))
keywords = strUniqueAndsplitBySemicolon(";".join(keywordsArr))
subjects = strUniqueAndsplitBySemicolon(";".join(subjectsArr))
journals = strUniqueAndsplitBySemicolon(";".join(journalsArr))
expert = noneToEmptyArr(
[expert1[0], noneToEmptyStr(expert1[1]), organization, secondOrganization,
email, noneToEmptyStr(expert1[5]), noneToEmptyStr(expert1[6]), noneToEmptyStr(expert1[7]),
noneToEmptyStr(expert1[8]), organizationdept, keywords, subjects, journals, noneToEmptyStr(expert1[13])])
return expert
def insertExpertTmpOrg(client, expertArr, relationshipMap):
if expertArr is None or len(expertArr) == 0 or relationshipMap is None or len(relationshipMap.keys()) == 0:
return
executeGuidArr = []
sql1 = (
" insert into " + ExpertTmp1 + "(" + ExpertQueryColumns + ")VALUES"
)
sql1_InsertNum = 0
for item in expertArr:
executeGuidArr.append(item[1])
# 开始插表
sql1_InsertNum += 1
itemSql1 = (
"(" +
getDbStr(escapeCharacter(item[0])) + "," +
getDbStr(escapeCharacter(item[1])) + "," +
getDbStr(escapeCharacter(item[2])) + "," +
getDbStr(escapeCharacter(item[3])) + "," +
getDbStr(escapeCharacter(item[4])) + "," +
getDbStr(escapeCharacter(item[5])) + "," +
getDbStr(escapeCharacter(item[6])) + "," +
getDbStr(escapeCharacter(item[7])) + "," +
getDbStr(escapeCharacter(item[8])) + "," +
getDbStr(escapeCharacter(item[9])) + "," +
getDbStr(escapeCharacter(item[10])) + "," +
getDbStr(escapeCharacter(item[11])) + "," +
getDbStr(escapeCharacter(item[12])) + "," +
getDbStr(escapeCharacter(item[13])) +
"),"
)
sql1 = sql1 + itemSql1
# 每 1500 条数据insert一次
if sql1_InsertNum >= 1500:
if (sql1.endswith(',')):
sql1 = sql1[0:len(sql1) - 1]
if (sql1.endswith(')')):
client.execute(sql1)
sql1 = (
" insert into " + ExpertTmp1 + "(" + ExpertQueryColumns + ")VALUES"
)
if sql1_InsertNum > 0:
if (sql1.endswith(',')):
sql1 = sql1[0:len(sql1) - 1]
if (sql1.endswith(')')):
client.execute(sql1)
sql2 = (
" insert into " + RelationshipTmp1 + "(uuid,guid)VALUES"
)
sql2_InsertNum = 0
for uuidItem in relationshipMap.keys():
guidArr = relationshipMap[uuidItem]
for guidItem in guidArr:
sql2_InsertNum += 1
itemSql2 = (
"(" +
getDbStr(escapeCharacter(uuidItem)) + "," +
getDbStr(escapeCharacter(guidItem)) +
"),"
)
sql2 = sql2 + itemSql2
# 每 1500 条数据insert一次
if sql2_InsertNum >= 1500:
if (sql2.endswith(',')):
sql2 = sql2[0:len(sql2) - 1]
if (sql2.endswith(')')):
client.execute(sql2)
sql2 = (
" insert into " + RelationshipTmp1 + "(uuid,guid)VALUES"
)
if sql2_InsertNum > 0:
if (sql2.endswith(',')):
sql2 = sql2[0:len(sql2) - 1]
if (sql2.endswith(')')):
client.execute(sql2)
def getKey(name):
if name is None or name == "":
return ""
if name.find(",") > -1:
key = name[:name.find(",")]
else:
if len(name) > 2:
key = name[:2]
else:
key = name
return key
def recordNumberExecutions(client, organization, num):
sql = (
" insert into " + RepeatOrgExecute + "(organization,num)VALUES(" +
getDbStr(escapeCharacter(organization)) + "," + str(num) + ")"
)
client.execute(sql)
def executeMergeBySameOrg(client, organization):
"""
合并 organization 相同,name 是子串的
"""
sql1 = (
" select " + ExpertQueryColumns +
" from " + ExpertTmp + " where organization = " + getDbStr(escapeCharacter(organization)) +
" group by " + ExpertQueryColumns +
" order by length(name) desc"
)
sql1_rows = client.execute(sql1)
print(organization, len(sql1_rows))
if sql1_rows is not None and len(sql1_rows) > 0:
columns = ExpertQueryColumns.split(",")
df = pd.DataFrame(sql1_rows, columns=columns)
dataMap = {}
for i in range(len(df)):
item = df.values[i]
itemMap = {
"index": i,
"data": item
}
itemName = str(item[1])
itemKey = getKey(itemName)
if itemKey == "":
continue
if itemKey not in dataMap.keys():
dataMap[itemKey] = [itemMap]
else:
oldDataArr = dataMap[itemKey]
oldDataArr.append(itemMap)
dataMap[itemKey] = oldDataArr
arr = []
uuidMap = {}
keepMap = {}
repeatIndex = []
delUuidArr = []
for i in range(len(df)):
if i % 100 == 0:
recordNumberExecutions(client, organization, i + 1)
uuidArr = []
uuidI = str(df.values[i][0])
if uuidI in delUuidArr:
continue
nameI = df.values[i][1]
itemKey = getKey(nameI)
nameI = re.sub("[^A-Za-z]", "", nameI)
emailI = df.values[i][4]
countryI = df.values[i][7]
dataArr = dataMap[itemKey]
for j in range(len(dataArr)):
index = dataArr[j]["index"]
itemData = dataArr[j]["data"]
uuidJ = str(itemData[0])
if uuidI == uuidJ:
continue
if uuidJ in delUuidArr:
continue
nameJ = itemData[1]
nameJ = re.sub("[^A-Za-z]", "", nameJ)
emailJ = itemData[4]
countryJ = df.values[j][7]
# 中国人就用拼音声母算法
if (countryI != None and countryJ != None
and countryI.find("China") > -1 and countryJ.find("China") > -1):
check = py.isLCS(nameI, nameJ)
else:
check = lcs.isLCS(nameI, nameJ)
if check:
# 校验email是否有交集,若没有交集,则不合并
checkEmails, email = validation.isCommonPartByEmail(emailI, emailJ)
if not checkEmails:
continue
# 验证这两个专家是否为同一个人
sameExpert = validation.isSameExpert_org(client, uuidI, uuidJ)
if not sameExpert:
continue
repeatIndex.append(i)
repeatIndex.append(index)
uuidArr.append(uuidI)
uuidArr.append(uuidJ)
# J 合进 I
key = uuidI
uuidMap = getUuidMap(uuidArr, uuidMap, key)
delKey = uuidJ
delUuidArr.append(delKey)
if key not in keepMap.keys():
keepExpert = mergeRepeatExpert(df.values[i], itemData)
else:
oldExpert = keepMap[key]
keepExpert = mergeRepeatExpert(oldExpert, itemData)
if delKey in keepMap.keys():
delExpert = keepMap[delKey]
keepExpert = mergeRepeatExpert(keepExpert, delExpert)
del keepMap[delKey]
keepExpert[4] = email
keepMap[key] = keepExpert
if len(keepMap.keys()) > 0:
for key in keepMap.keys():
arr.append(keepMap[key])
for i in range(len(df)):
if i not in repeatIndex:
arr.append(noneToEmptyArr(list(df.values[i])))
key = str(df.values[i][0])
uuidArr = [key]
uuidMap = getUuidMap(uuidArr, uuidMap, key)
guidMap = getGuidMapOrg(client, uuidMap)
insertExpertTmpOrg(client, arr, guidMap)
def doMergeBySameOrgThread(organization):
if organization is not None and organization != '':
client = getClient()
executeMergeBySameOrg(client, organization)
def initTable_step2_8_org(client):
if machine == 1:
client.execute(ExpertTmp1Delete)
client.execute(ExpertTmp1Create)
client.execute(RelationshipTmp1Delete)
client.execute(RelationshipTmp1Create)
client.execute(RepeatOrgDelete)
client.execute(RepeatOrgCreate)
client.execute(RepeatOrgUuidDelete)
client.execute(RepeatOrgUuidCreate)
client.execute(RepeatOrgExecuteDelete)
client.execute(RepeatOrgExecuteCreate)
time.sleep(2)
def clearTable_step2_8_org(client):
client.execute(RepeatOrgDelete)
client.execute(RepeatOrgUuidDelete)
client.execute(RepeatOrgExecuteDelete)
def extractRepeatOrganization(client):
if machine == 1:
sql = (
" insert into " + RepeatOrg +
" select organization,count() as num from " + ExpertTmp + " group by organization HAVING count() > 1 "
)
client.execute(sql)
def mergeBySameOrg(client):
sql0_1 = (
" SELECT organization from " + RepeatOrg +
" order by num desc "
)
# 这里是分机器跑,如果有这个需求的话,复制 step2_8_org1.py 这个文件,修改 machine 值,走对应的分支逻辑
if machine == 1:
# 机器 1 处理 201 ~ 以后的全部
# sql0_1 += " limit 200,10000000000 "
sql0_1 += " limit 10000000000 "
elif machine == 2:
# 机器 2 处理 51 ~ 200 的
sql0_1 += " limit 50,150 "
elif machine == 3:
# 机器 3 处理 21 ~ 50
sql0_1 += " limit 20,30 "
elif machine == 4:
# 机器 4 处理 2 ~ 20 的
sql0_1 += " limit 1,19 "
else:
# 机器 5 处理 1 ~ 1 的,处理这个最长的数据
sql0_1 += " limit 0,1 "
sql0_1_rows = client.execute(sql0_1)
if sql0_1_rows is not None and len(sql0_1_rows) > 0:
index = 0
while True:
threadSize = len(threading.enumerate())
if threadSize < constant.threadMaxSize:
organization = str(sql0_1_rows[index][0])
t = threading.Thread(target=doMergeBySameOrgThread, args=(organization,))
t.start()
index += 1
if index >= len(sql0_1_rows):
break
while True:
time.sleep(10)
arr = threading.enumerate()
print('threading alive count', len(arr))
if len(arr) == 1:
break
def extractSingleOrganization(client):
"""
提取单一的 organization,直接保存
"""
if machine == 1:
organizations = (
" select organization from " + RepeatOrg
)
sql1 = (
" insert into " + ExpertTmp1 +
" select " + ExpertQueryColumns + " from " + ExpertTmp +
" where organization not in(" + organizations + ")"
)
client.execute(sql1)
sql2 = (
" insert into " + RelationshipTmp1 +
" select uuid,guid from " + RelationshipTmp +
" where uuid in (select uuid from " + ExpertTmp + " where organization not in(" + organizations + "))"
)
client.execute(sql2)
machine = 1 # 机器编号
def getClient():
config = constant.dbConfig
client = Client(host=config['host'], send_receive_timeout=config['timeout'],
database=config['db'], user=config['user'], password=config['pwd'])
return client
def main():
client = getClient()
# 初始化表
initTable_step2_8_org(client)
# 提取重复的 organization
extractRepeatOrganization(client)
# 合并 organization 相同,name 是子串的
mergeBySameOrg(client)
# 提取单一的 organization
extractSingleOrganization(client)
# 清理临时表(分机器跑了,不知道谁先谁后,就不自动清理临时表了,后面统一清理)
# clearTable_step2_8_org(client)
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/tf001/ZjdxCKDataHandelPyThon.git
git@gitee.com:tf001/ZjdxCKDataHandelPyThon.git
tf001
ZjdxCKDataHandelPyThon
ZjdxCKDataHandelPyThon
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891