2 Star 0 Fork 0

tf / ZjdxCKDataHandelPyThon

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
step2_1~7.py 33.82 KB
一键复制 编辑 原始数据 按行查看 历史
tf 提交于 2021-10-11 11:44 . 更新代码
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
from clickhouse_driver import Client
import pandas as pd
import threading
import time
import constant
import lcs
import re
import pinyin as py
import validation
def createTable(tableName, columns):
return "CREATE TABLE " + tableName + columns
def deleteTable(tableName):
return "DROP TABLE IF EXISTS " + tableName
ExpertColumns = (
" ("
" `uuid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String),"
" `keywords` Nullable(String),"
" `subjects` Nullable(String),"
" `journals` Nullable(String),"
" `source_email` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
RelationshipColumns = (
" ("
" `uuid` String,"
" `guid` String"
" )"
" ENGINE = MergeTree"
" ORDER BY uuid"
" SETTINGS index_granularity = 8192"
)
ExecuteGuidColumns = (
" ("
" `guid` String"
" )"
" ENGINE = MergeTree"
" ORDER BY guid"
" SETTINGS index_granularity = 8192"
)
ExecuteEmailColumns = (
" ("
" `email` String,"
" `num` Int32"
" )"
" ENGINE = MergeTree"
" ORDER BY num"
" SETTINGS index_granularity = 8192"
)
OutPutQueryColumns = (
"guid,name,organization,Second_organization,email,reprintauthor,altname,country,firstauthor,organizationdept,keywords,subjects,journals"
)
ExpertQueryColumns = (
" uuid,name,organization,Second_organization,email,reprintauthor,altname,country,firstauthor,organizationdept,"
" keywords,subjects,journals,source_email "
)
China = constant.China
OutPut = constant.OutPut
ExecuteEmail = OutPut + "_Execute_Email"
ExecuteEmailDelete = deleteTable(ExecuteEmail)
ExecuteEmailCreate = createTable(ExecuteEmail, ExecuteEmailColumns)
ExecuteGuid = OutPut + "_Execute_Guid"
ExecuteGuidDelete = deleteTable(ExecuteGuid)
ExecuteGuidCreate = createTable(ExecuteGuid, ExecuteGuidColumns)
Expert = OutPut + "_Expert"
ExpertTmp = Expert + "_Tmp"
ExpertTmpDelete = deleteTable(ExpertTmp)
ExpertTmpCreate = createTable(ExpertTmp, ExpertColumns)
Relationship = OutPut + "_Experts_Relationship"
RelationshipTmp = Relationship + "_Tmp"
RelationshipTmpDelete = deleteTable(RelationshipTmp)
RelationshipTmpCreate = createTable(RelationshipTmp, RelationshipColumns)
def heartbeatQuery(client):
"""
心跳查询,防止程序执行时间过程,数据库链接断开
"""
client.execute("select 1")
def escapeCharacter(str):
"""
数据库sql字符串转义
\n ' 转义为 ''
\n { 转义为 {{
\n } 转义为 }}
:param str:
:return:
"""
return str.replace("'", "''").replace("{", "{{").replace("}", "}}")
def getDbStr(strParam):
strParam = str(strParam)
max = len(strParam)
i = 0
while True:
strParam = strParam.strip()
if strParam.startswith("'"):
strParam = strParam[1:]
if strParam.endswith("'"):
strParam = strParam[:len(strParam) - 1]
if strParam.endswith("\\"):
strParam = strParam[:len(strParam) - 1]
if not strParam.startswith("'") and not strParam.endswith("'") and not strParam.endswith("\\"):
# 收尾的单引号都删除后,返回
break
i += 1
if i > max:
# 防止死循环
break
return "'" + strParam + "'"
def arrUnique(arr):
return list(pd.unique(arr))
def strUniqueAndsplitBySemicolon(str):
"""
去重,分号隔开
"""
arr = []
if str is not None and len(str.strip()) > 0:
arr1 = str.split(";")
for item1 in arr1:
if len(item1.strip()) > 0:
arr2 = item1.strip().split(",")
for item2 in arr2:
if len(item2.strip()) > 0:
arr.append(item2.strip())
# 去重,分号隔开
singleArr = arrUnique(arr)
return ';'.join(singleArr)
else:
return ''
def noneToEmptyStr(value):
res = ''
if value is not None and value != 'None':
res = value
return res
def noneToEmptyArr(arr):
res = []
if len(arr) > 0:
for i in range(len(arr)):
item = arr[i]
if item is not None and item != 'None':
res.append(str(item))
else:
res.append('')
return res
def mergeRepeatExpert(expert1, expert2):
organizationArr = arrUnique([noneToEmptyStr(expert1[3]), noneToEmptyStr(expert2[3])])
secondOrganizationArr = arrUnique([noneToEmptyStr(expert1[4]), noneToEmptyStr(expert2[4])])
# emailArr = [noneToEmptyStr(expert1[5]), noneToEmptyStr(expert2[5])]
organizationdeptArr = arrUnique([noneToEmptyStr(expert1[10]), noneToEmptyStr(expert2[10])])
keywordsArr = [noneToEmptyStr(expert1[11]), noneToEmptyStr(expert2[11])]
subjectsArr = [noneToEmptyStr(expert1[12]), noneToEmptyStr(expert2[12])]
journalsArr = [noneToEmptyStr(expert1[13]), noneToEmptyStr(expert2[13])]
organization = strUniqueAndsplitBySemicolon(";".join(organizationArr))
secondOrganization = strUniqueAndsplitBySemicolon(";".join(secondOrganizationArr))
# email = strUniqueAndsplitBySemicolon(";".join(emailArr))
email = validation.checkAndGetSingleEmail(noneToEmptyStr(expert1[5]), noneToEmptyStr(expert2[5]),
noneToEmptyStr(expert1[2]), organization)
organizationdept = strUniqueAndsplitBySemicolon(";".join(organizationdeptArr))
keywords = strUniqueAndsplitBySemicolon(";".join(keywordsArr))
subjects = strUniqueAndsplitBySemicolon(";".join(subjectsArr))
journals = strUniqueAndsplitBySemicolon(";".join(journalsArr))
expert = noneToEmptyArr(
[expert1[0], noneToEmptyStr(expert1[1]), noneToEmptyStr(expert1[2]), organization,
secondOrganization, email, noneToEmptyStr(expert1[6]), noneToEmptyStr(expert1[7]),
noneToEmptyStr(expert1[8]), noneToEmptyStr(expert1[9]), organizationdept, keywords, subjects, journals])
return expert
def insertExecuteGuid(client, guidArr):
if guidArr is None or len(guidArr) == 0:
return
guidArr = arrUnique(guidArr)
sql = (
" insert into " + ExecuteGuid + "(guid)VALUES"
)
insertNum = 0
for guid in guidArr:
insertNum += 1
itemSql = (
"(" + getDbStr(escapeCharacter(guid)) + "),"
)
sql = sql + itemSql
# 每 1500 条数据insert一次
if insertNum >= 1500:
if (sql.endswith(',')):
sql = sql[0:len(sql) - 1]
if (sql.endswith(')')):
client.execute(sql)
sql = (
" insert into " + ExecuteGuid + "(guid)VALUES"
)
if insertNum > 0:
if (sql.endswith(',')):
sql = sql[0:len(sql) - 1]
if (sql.endswith(')')):
client.execute(sql)
def insertExpertTmp(client, expertArr, relationshipMap, sourceEmail):
if expertArr is None or len(expertArr) == 0 or relationshipMap is None or len(relationshipMap) == 0:
return
executeGuidArr = []
sql1 = (
" insert into " + ExpertTmp + "(" + ExpertQueryColumns + ")VALUES"
)
sql1_InsertNum = 0
for item in expertArr:
# 开始插表
sql1_InsertNum += 1
itemSql1 = (
"(" +
getDbStr(escapeCharacter(item[0])) + "," +
getDbStr(escapeCharacter(item[2])) + "," +
getDbStr(escapeCharacter(item[3])) + "," +
getDbStr(escapeCharacter(item[4])) + "," +
getDbStr(escapeCharacter(item[5])) + "," +
getDbStr(escapeCharacter(item[6])) + "," +
getDbStr(escapeCharacter(item[7])) + "," +
getDbStr(escapeCharacter(item[8])) + "," +
getDbStr(escapeCharacter(item[9])) + "," +
getDbStr(escapeCharacter(item[10])) + "," +
getDbStr(escapeCharacter(item[11])) + "," +
getDbStr(escapeCharacter(item[12])) + "," +
getDbStr(escapeCharacter(item[13])) + "," +
getDbStr(escapeCharacter(sourceEmail)) +
"),"
)
sql1 = sql1 + itemSql1
# 每 1500 条数据insert一次
if sql1_InsertNum >= 1500:
if (sql1.endswith(',')):
sql1 = sql1[0:len(sql1) - 1]
if (sql1.endswith(')')):
client.execute(sql1)
sql1 = (
" insert into " + ExpertTmp + "(" + ExpertQueryColumns + ")VALUES"
)
if sql1_InsertNum > 0:
if (sql1.endswith(',')):
sql1 = sql1[0:len(sql1) - 1]
if (sql1.endswith(')')):
client.execute(sql1)
sql2 = (
" insert into " + RelationshipTmp + "(uuid,guid)VALUES"
)
sql2_InsertNum = 0
for uuidItem in relationshipMap.keys():
guidArr = relationshipMap[uuidItem]
executeGuidArr.extend(guidArr)
for guidItem in guidArr:
sql2_InsertNum += 1
itemSql2 = (
"(" +
getDbStr(escapeCharacter(uuidItem)) + "," +
getDbStr(escapeCharacter(guidItem)) +
"),"
)
sql2 = sql2 + itemSql2
# 每 1500 条数据insert一次
if sql2_InsertNum >= 1500:
if (sql2.endswith(',')):
sql2 = sql2[0:len(sql2) - 1]
if (sql2.endswith(')')):
client.execute(sql2)
sql2 = (
" insert into " + RelationshipTmp + "(uuid,guid)VALUES"
)
if sql2_InsertNum > 0:
if (sql2.endswith(',')):
sql2 = sql2[0:len(sql2) - 1]
if (sql2.endswith(')')):
client.execute(sql2)
insertExecuteGuid(client, executeGuidArr)
def insertExpertNullEmailTmp(client, expertArr):
if expertArr is None or len(expertArr) == 0:
return
sql1 = (
" insert into " + ExpertTmp + "(" + ExpertQueryColumns + ")VALUES"
)
sql2 = (
" insert into " + RelationshipTmp + "(uuid,guid)VALUES"
)
insertNum = 0
for item in expertArr:
# 开始插表
insertNum += 1
itemSql1 = (
"(" +
getDbStr(escapeCharacter(item[0])) + "," +
getDbStr(escapeCharacter(item[2])) + "," +
getDbStr(escapeCharacter(item[3])) + "," +
getDbStr(escapeCharacter(item[4])) + "," +
getDbStr(escapeCharacter(item[5])) + "," +
getDbStr(escapeCharacter(item[6])) + "," +
getDbStr(escapeCharacter(item[7])) + "," +
getDbStr(escapeCharacter(item[8])) + "," +
getDbStr(escapeCharacter(item[9])) + "," +
getDbStr(escapeCharacter(item[10])) + "," +
getDbStr(escapeCharacter(item[11])) + "," +
getDbStr(escapeCharacter(item[12])) + "," +
getDbStr(escapeCharacter(item[13])) + "," +
"null" +
"),"
)
sql1 = sql1 + itemSql1
itemSql2 = (
"(" +
getDbStr(escapeCharacter(item[0])) + "," +
getDbStr(escapeCharacter(item[1])) +
"),"
)
sql2 = sql2 + itemSql2
# 每 1500 条数据insert一次
if insertNum >= 1500:
if (sql1.endswith(',')):
sql1 = sql1[0:len(sql1) - 1]
if (sql1.endswith(')')):
client.execute(sql1)
sql1 = (
" insert into " + ExpertTmp + "(" + ExpertQueryColumns + ")VALUES"
)
if (sql2.endswith(',')):
sql2 = sql2[0:len(sql2) - 1]
if (sql2.endswith(')')):
client.execute(sql2)
sql2 = (
" insert into " + RelationshipTmp + "(uuid,guid)VALUES"
)
insertNum = 0
if insertNum > 0:
if (sql1.endswith(',')):
sql1 = sql1[0:len(sql1) - 1]
if (sql1.endswith(')')):
print('insert ExpertTmp last', insertNum, ' 条数据')
client.execute(sql1)
if (sql2.endswith(',')):
sql2 = sql2[0:len(sql2) - 1]
if (sql2.endswith(')')):
print('insert RelationshipTmp last', insertNum, ' 条数据')
client.execute(sql2)
def executeSourceEmail(client, sourceEmail):
sqlGuids = (
" select guid from " + OutPut +
" where email = " + getDbStr(escapeCharacter(sourceEmail))
)
sqlNotInGuids = (
" select guid from " + ExecuteGuid
)
sql1 = (
" select generateUUIDv4() as uuid," + OutPutQueryColumns +
" from " + OutPut +
" where guid in (" + sqlGuids + ")" +
" and guid not in (" + sqlNotInGuids + ")" +
" order by length(name) desc "
)
sql1_rows = client.execute(sql1)
print(sourceEmail, len(sql1_rows))
if sql1_rows is not None and len(sql1_rows) > 0:
columnsStr = "uuid," + OutPutQueryColumns
columns = columnsStr.split(",")
df = pd.DataFrame(sql1_rows, columns=columns)
if len(df) == 1:
# 只一个作者,直接保存
item = df.values[0]
arr = [noneToEmptyArr(list(item))]
guidMap = {
str(item[0]): [item[1]]
}
insertExpertTmp(client, arr, guidMap, sourceEmail)
else:
# print("**************************************** 1 ****************************************")
# 合并 name,organization 完全相同的
df1 = df.groupby(['name', 'organization'])
arr1 = []
guidMap1 = {}
for item1 in df1:
if len(item1[1]) == 1:
# 唯一,无重复
arr1.append(noneToEmptyArr(list(item1[1].values[0])))
guidKey = str(item1[1].values[0][0])
guidValue = item1[1].values[0][1]
if guidKey not in guidMap1.keys():
guidValueArr = [guidValue]
guidMap1[guidKey] = noneToEmptyArr(guidValueArr)
else:
guidValueArr = guidMap1[guidKey]
guidValueArr.append(guidValue)
guidMap1[guidKey] = noneToEmptyArr(arrUnique(guidValueArr))
else:
# 有重复,合并,判断是不是同一个人
item1Df = item1[1]
keepMap = {}
repeatIndex = []
delUuidArr = []
for i in range(len(item1Df)):
same = False
guidValueArr = []
iExpert = item1Df.values[i]
uuidI = str(iExpert[0])
if uuidI in delUuidArr:
continue
key = uuidI
guidI = iExpert[1]
nameI = iExpert[2]
for j in range(i + 1, len(item1Df)):
jExpert = item1Df.values[j]
uuidJ = str(jExpert[0])
if uuidJ in delUuidArr:
continue
guidJ = jExpert[1]
nameJ = jExpert[2]
if key not in keepMap.keys():
xExpert = iExpert
else:
xExpert = keepMap[key]
yExpert = jExpert
sameExpert = validation.isSameExpert_2(xExpert[11], xExpert[12], xExpert[13], xExpert[4],
yExpert[11], yExpert[12], yExpert[13], yExpert[4])
# 若不存在同一领域交集,返回
if not sameExpert:
continue
same = True
repeatIndex.append(i)
repeatIndex.append(j)
guidValueArr.append(guidI)
guidValueArr.append(guidJ)
if len(nameI) >= len(nameJ):
keepName = nameI
else:
keepName = nameJ
# J 合进 I
delKey = uuidJ
delUuidArr.append(delKey)
if key not in keepMap.keys():
xExpert = iExpert
else:
xExpert = keepMap[key]
xExpert[2] = keepName
keepExpert = mergeRepeatExpert(xExpert, jExpert)
if delKey in keepMap.keys():
delExpert = keepMap[delKey]
keepExpert = mergeRepeatExpert(keepExpert, delExpert)
del keepMap[delKey]
keepMap[key] = keepExpert
if key in guidMap1.keys():
guidValueArr1 = guidMap1[key]
guidValueArr.extend(guidValueArr1)
if delKey in guidMap1.keys():
oldDelGuidValueArr1 = guidMap1[delKey]
guidValueArr.extend(oldDelGuidValueArr1)
guidMap1[key] = noneToEmptyArr(arrUnique(guidValueArr))
if not same:
# 第 i 个元素,不是跟其他人不是同一个人,直接保存
arr1.append(
noneToEmptyArr(
[
noneToEmptyStr(iExpert[0]), noneToEmptyStr(iExpert[1]),
noneToEmptyStr(iExpert[2]), noneToEmptyStr(iExpert[3]),
noneToEmptyStr(iExpert[4]), noneToEmptyStr(iExpert[5]),
noneToEmptyStr(iExpert[6]), noneToEmptyStr(iExpert[7]),
noneToEmptyStr(iExpert[8]), noneToEmptyStr(iExpert[9]),
noneToEmptyStr(iExpert[10]), noneToEmptyStr(iExpert[11]),
noneToEmptyStr(iExpert[12]), noneToEmptyStr(iExpert[13])
]
)
)
key = noneToEmptyStr(iExpert[0])
value = noneToEmptyStr(iExpert[1])
if key not in guidMap1.keys():
guidMap1[key] = noneToEmptyArr([value])
else:
old = guidMap1[key]
old.append(value)
guidMap1[key] = noneToEmptyArr(arrUnique(old))
if len(keepMap.keys()) > 0:
for key in keepMap.keys():
arr1.append(keepMap[key])
for i in range(len(item1Df)):
if i not in repeatIndex:
arr1.append(noneToEmptyArr(list(item1Df.values[i])))
guidKey = str(item1Df.values[i][0])
guidValue = item1Df.values[i][1]
guidValueArr = [guidValue]
if guidKey in guidMap1.keys():
oldGuidValueArr1 = guidMap1[guidKey]
guidValueArr.extend(oldGuidValueArr1)
guidMap1[guidKey] = noneToEmptyArr(arrUnique(guidValueArr))
# print(arr1)
# print(guidMap1)
# print("**************************************** 2 ****************************************")
# 合并 organization 相同,name 是子串的
df2 = pd.DataFrame(arr1, columns=columns)
arr2 = []
guidMap2 = {}
df2_1 = df2.groupby(['organization'])
for item2 in df2_1:
item2Df = item2[1]
if len(item2Df) == 1:
# 唯一,无重复
arr2.append(noneToEmptyArr(list(item2Df.values[0])))
guidKey = str(item2Df.values[0][0])
guidValue = item2Df.values[0][1]
guidValueArr = [guidValue]
if guidKey in guidMap1.keys():
oldGuidValueArr1 = guidMap1[guidKey]
guidValueArr.extend(oldGuidValueArr1)
guidMap2[guidKey] = noneToEmptyArr(arrUnique(guidValueArr))
else:
# 有重复,比较 name 子串
keepMap = {}
repeatIndex = []
delUuidArr = []
for i in range(len(item2Df)):
guidValueArr = []
uuidI = str(item2Df.values[i][0])
if uuidI in delUuidArr:
continue
guidI = item2Df.values[i][1]
nameI = item2Df.values[i][2]
countryI = item2Df.values[i][8]
nameI = re.sub("[^A-Za-z]", "", nameI)
for j in range(i + 1, len(item2Df)):
uuidJ = str(item2Df.values[j][0])
if uuidJ in delUuidArr:
continue
guidJ = item2Df.values[j][1]
nameJ = item2Df.values[j][2]
countryJ = item2Df.values[j][8]
nameJ = re.sub("[^A-Za-z]", "", nameJ)
# 中国人就用拼音声母算法
if (countryI != None and countryJ != None
and countryI.find("China") > -1 and countryJ.find("China") > -1):
check = py.isLCS(nameI, nameJ)
else:
check = lcs.isLCS(nameI, nameJ)
if check:
key = uuidI
if key not in keepMap.keys():
xExpert = item2Df.values[i]
else:
xExpert = keepMap[key]
yExpert = item2Df.values[j]
sameExpert = validation.isSameExpert_2(xExpert[11], xExpert[12], xExpert[13],
xExpert[4], yExpert[11], yExpert[12],
yExpert[13], yExpert[4])
if not sameExpert:
continue
repeatIndex.append(i)
repeatIndex.append(j)
guidValueArr.append(guidI)
guidValueArr.append(guidJ)
if len(nameI) >= len(nameJ):
keepName = nameI
else:
keepName = nameJ
# J 合进 I
delKey = uuidJ
delUuidArr.append(delKey)
if key not in keepMap.keys():
xExpert = item2Df.values[i]
else:
xExpert = keepMap[key]
xExpert[2] = keepName
keepExpert = mergeRepeatExpert(xExpert, item2Df.values[j])
if delKey in keepMap.keys():
delExpert = keepMap[delKey]
keepExpert = mergeRepeatExpert(keepExpert, delExpert)
del keepMap[delKey]
keepMap[key] = keepExpert
if key in guidMap1.keys():
guidValueArr1 = guidMap1[key]
guidValueArr.extend(guidValueArr1)
if delKey in guidMap1.keys():
oldDelGuidValueArr1 = guidMap1[delKey]
guidValueArr.extend(oldDelGuidValueArr1)
guidMap2[key] = noneToEmptyArr(arrUnique(guidValueArr))
if len(keepMap.keys()) > 0:
for key in keepMap.keys():
arr2.append(keepMap[key])
for i in range(len(item2Df)):
if i not in repeatIndex:
arr2.append(noneToEmptyArr(list(item2Df.values[i])))
guidKey = str(item2Df.values[i][0])
guidValue = item2Df.values[i][1]
guidValueArr = [guidValue]
if guidKey in guidMap1.keys():
oldGuidValueArr1 = guidMap1[guidKey]
guidValueArr.extend(oldGuidValueArr1)
guidMap2[guidKey] = noneToEmptyArr(arrUnique(guidValueArr))
# print(arr2)
# print(guidMap2)
# print("**************************************** 3 ****************************************")
# 合并 name 相同, organization 是子串的
df3 = pd.DataFrame(arr2, columns=columns)
arr3 = []
guidMap3 = {}
df3_1 = df3.groupby(['name'])
for item3 in df3_1:
item3Df = item3[1]
item3DfDataArr = item3Df.values
if len(item3DfDataArr) == 1:
# 唯一,无重复
arr3.append(noneToEmptyArr(list(item3DfDataArr[0])))
guidKey = str(item3DfDataArr[0][0])
guidValue = item3DfDataArr[0][1]
guidValueArr = [guidValue]
if guidKey in guidMap2.keys():
oldGuidValueArr2 = guidMap2[guidKey]
guidValueArr.extend(oldGuidValueArr2)
guidMap3[guidKey] = noneToEmptyArr(arrUnique(guidValueArr))
else:
# 有重复,比较 organization 子串
keepMap = {}
repeatIndex = []
delUuidArr = []
orgI = ""
for i in range(len(item3DfDataArr)):
guidValueArr = []
uuidI = str(item3DfDataArr[i][0])
if uuidI in delUuidArr:
continue
guidI = item3DfDataArr[i][1]
if orgI == "":
orgI = item3DfDataArr[i][3]
for j in range(i + 1, len(item3DfDataArr)):
uuidJ = str(item3DfDataArr[j][0])
if uuidJ in delUuidArr:
continue
guidJ = item3DfDataArr[j][1]
orgJ = item3DfDataArr[j][3]
# 若机构没有交集,返回
if not validation.isCommonPart(orgI, orgJ):
continue
key = uuidI
if key not in keepMap.keys():
xExpert = item3DfDataArr[i]
else:
xExpert = keepMap[key]
yExpert = item3DfDataArr[j]
sameExpert = validation.isSameExpert_2(xExpert[11], xExpert[12], xExpert[13], xExpert[4],
yExpert[11], yExpert[12], yExpert[13], yExpert[4])
# 若不存在同一领域交集,返回
if not sameExpert:
continue
repeatIndex.append(i)
repeatIndex.append(j)
guidValueArr.append(guidI)
guidValueArr.append(guidJ)
# J 合进 I
delKey = uuidJ
delUuidArr.append(delKey)
if key not in keepMap.keys():
xExpert = item3DfDataArr[i]
else:
xExpert = keepMap[key]
keepExpert = mergeRepeatExpert(xExpert, item3DfDataArr[j])
orgI = keepExpert[3]
if delKey in keepMap.keys():
delExpert = keepMap[delKey]
keepExpert = mergeRepeatExpert(keepExpert, delExpert)
del keepMap[delKey]
keepMap[key] = keepExpert
if key in guidMap2.keys():
guidValueArr2 = guidMap2[key]
guidValueArr.extend(guidValueArr2)
if delKey in guidMap2.keys():
oldDelGuidValueArr2 = guidMap2[delKey]
guidValueArr.extend(oldDelGuidValueArr2)
guidMap3[key] = noneToEmptyArr(arrUnique(guidValueArr))
if len(keepMap.keys()) > 0:
for key in keepMap.keys():
arr3.append(keepMap[key])
# 没合并过的专家,直接保存
for i in range(len(item3DfDataArr)):
if i not in repeatIndex:
arr3.append(noneToEmptyArr(list(item3DfDataArr[i])))
guidKey = str(item3DfDataArr[i][0])
guidValue = item3DfDataArr[i][1]
guidValueArr = [guidValue]
if guidKey in guidMap2.keys():
oldGuidValueArr2 = guidMap2[guidKey]
guidValueArr.extend(oldGuidValueArr2)
guidMap3[guidKey] = noneToEmptyArr(arrUnique(guidValueArr))
# print(arr3)
# print(guidMap3)
insertExpertTmp(client, arr3, guidMap3, sourceEmail)
def initTable_step2_1_7(client):
client.execute(ExpertTmpDelete)
client.execute(ExpertTmpCreate)
client.execute(RelationshipTmpDelete)
client.execute(RelationshipTmpCreate)
client.execute(ExecuteGuidDelete)
client.execute(ExecuteGuidCreate)
client.execute(ExecuteEmailDelete)
client.execute(ExecuteEmailCreate)
time.sleep(2)
def clearTable_step2_1_7(client):
client.execute(ExecuteGuidDelete)
client.execute(ExecuteEmailDelete)
def insertExecuteEmail(client):
sql = (
" INSERT into " + ExecuteEmail +
" SELECT email,count() as num from " + OutPut +
" where email is not NULL and email != '' " +
" group by email"
)
client.execute(sql)
time.sleep(2)
def getExpertByEmail(client):
sql0_1 = (
" SELECT email from " + ExecuteEmail + " order by num desc "
)
sql0_1_rows = client.execute(sql0_1)
for sql0_1_rows_item in sql0_1_rows:
email = str(sql0_1_rows_item[0])
executeSourceEmail(client, email)
def getExpertByNullEmail(client):
dataArr = []
limitFirst = 0
limitSecond = 1000
while True:
# 每50万条数据插入一次,免得内存溢出
if len(dataArr) > 500000:
insertExpertNullEmailTmp(client, dataArr.copy())
dataArr = []
sqlGuids = (
" select guid from " + OutPut +
" where guid not in(select guid from " + ExecuteGuid + ") group by guid order by guid" +
" limit " + str(limitFirst) + " , " + str(limitSecond)
)
sql1 = (
" select generateUUIDv4() as uuid," + OutPutQueryColumns + " from " + OutPut +
" where guid in(" + sqlGuids + ")"
)
sql1_rows = client.execute(sql1)
if len(sql1_rows) > 0:
for item in sql1_rows:
uuid = str(item[0])
expert = [uuid]
for i in range(1, len(item)):
expert.append(noneToEmptyStr(item[i]))
dataArr.append(expert)
print(" Null email ", len(sql1_rows))
limitFirst += limitSecond
else:
break
# 剩余数据插入
if len(dataArr) > 0:
insertExpertNullEmailTmp(client, dataArr)
def getClient():
config = constant.dbConfig
client = Client(host=config['host'], database=config['db'], user=config['user'], password=config['pwd'],
send_receive_timeout=config['timeout'])
return client
def main():
client = getClient()
# 初始化表
initTable_step2_1_7(client)
# 提取重复的email
insertExecuteEmail(client)
# 根据 email,筛选作者信息
getExpertByEmail(client)
# 筛选剩余的没有邮箱的作者
getExpertByNullEmail(client)
# 清理临时表
clearTable_step2_1_7(client)
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/tf001/ZjdxCKDataHandelPyThon.git
git@gitee.com:tf001/ZjdxCKDataHandelPyThon.git
tf001
ZjdxCKDataHandelPyThon
ZjdxCKDataHandelPyThon
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891