2 Star 0 Fork 0

tf / ZjdxCKDataHandelPyThon

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
step1.py 21.62 KB
一键复制 编辑 原始数据 按行查看 历史
tf 提交于 2021-10-11 11:44 . 更新代码
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
from clickhouse_driver import Client
import time
import re
import constant
def createTable(tableName, columns):
return "CREATE TABLE " + tableName + columns
def deleteTable(tableName):
return "DROP TABLE IF EXISTS " + tableName
OutPutColumns = (
" ("
" `guid` String,"
" `name` Nullable(String),"
" `organization` Nullable(String),"
" `Second_organization` Nullable(String),"
" `email` Nullable(String),"
" `reprintauthor` Nullable(String),"
" `altname` Nullable(String),"
" `country` Nullable(String),"
" `firstauthor` Nullable(String),"
" `organizationdept` Nullable(String),"
" `keywords` Nullable(String),"
" `subjects` Nullable(String),"
" `journals` Nullable(String)"
" )"
" ENGINE = MergeTree"
" ORDER BY guid"
" SETTINGS index_granularity = 8192"
)
China = constant.China
OutPut = constant.OutPut
OutPutDelete = deleteTable(OutPut)
OutPutCreate = createTable(OutPut, OutPutColumns)
def initTable_step1(client):
client.execute(OutPutDelete)
client.execute(OutPutCreate)
time.sleep(2)
def getDbStr(strParam):
strParam = str(strParam)
max = len(strParam)
i = 0
while True:
strParam = strParam.strip()
if strParam.startswith("'"):
strParam = strParam[1:]
if strParam.endswith("'"):
strParam = strParam[:len(strParam) - 1]
if strParam.endswith("\\"):
strParam = strParam[:len(strParam) - 1]
if not strParam.startswith("'") and not strParam.endswith("'") and not strParam.endswith("\\"):
# 收尾的单引号都删除后,返回
break
i += 1
if i > max:
# 防止死循环
break
return "'" + strParam + "'"
def strArrRemovalDuplicates(arr):
"""
字符串列表去重
:param arr:
:return:
"""
seen = set()
res = []
for item in arr:
if item not in seen:
item = item.strip()
seen.add(item)
res.append(item)
strArrRemovalEmpty(res)
return res
def lowercaseAndRemoveSymbol(x):
# 小写,保留中文,数字,字母,逗号
return re.sub("[^A-Za-z,0-9]", "", x).lower()
def strArrRemovalEmpty(arr):
"""
字符串列表,去除空字符串
:param arr:
:return:
"""
for item in arr:
if item == '':
arr.remove(item)
return arr
def escapeCharacter(str):
"""
数据库sql字符串转义
\n ' 转义为 ''
\n { 转义为 {{
\n } 转义为 }}
\n \ 转义为 \\
:param str:
:return:
"""
return str.replace("'", "''").replace("{", "{{").replace("}", "}}").replace("\\", "\\\\")
def strRemoveSpecialSymbols(str):
"""
去除字符串开始和结束的特殊字符,现在去除 '[' 和 ';'
:param str:
:return:
"""
# 去出收尾空字符串
str = str.strip()
# 去除开头的 ;
if (str.startswith(';')):
str = str[1:]
strRemoveSpecialSymbols(str)
# 去除末尾的 ;
if (str.endswith(';')):
str = str[:len(str) - 1]
strRemoveSpecialSymbols(str)
# 去除开头的 [
if (str.startswith('[')):
str = str[1:]
strRemoveSpecialSymbols(str)
# 去除末尾的 [
if (str.endswith('[')):
str = str[:len(str) - 1]
strRemoveSpecialSymbols(str)
return str
def splitRP(rp):
"""
切分通讯作者
:param rp:
:return:
"""
res = []
arr1 = rp.split(';')
for str1 in arr1:
str1 = str1.strip()
if str1.find('(') >= 0 and str1.find('author') >= 0:
arr2 = str1.split('(')
for str2 in arr2:
str2 = str2.strip()
if str2.find(')') < 0 and str2.find('author') < 0:
res.append(str2)
else:
res.append(str1)
res = strArrRemovalDuplicates(res)
return res
def getNameMap(fArr, sArr):
"""
获取名字字典
:param fArr:
:param sArr:
:return:
"""
nameMap = {}
if (len(fArr) == len(sArr)):
for i in range(len(fArr)):
fName = fArr[i]
sName = sArr[i]
nameMap[fName] = fName
nameMap[sName] = fName
return nameMap
def getEmails(afArr, rpArr, emArr, auMap, afMap):
"""
获取邮箱和名字全称的对应关系,例如
{'Li,Yu':'ly5566@126.com'}
:param afArr: 作者名字全称集
:param rpArr: 通讯作者集
:param emArr: 邮箱集
:param auMap: 名字简称字典
:param afMap: 名字全称字典
:return:
"""
emals = {}
if len(emArr) == len(rpArr):
# print('EM == RP')
itemAfArr = []
for i in range(len(rpArr)):
rp = rpArr[i]
if rp != '':
if rp in afMap.keys():
itemAf = afMap[rp]
else:
itemAf = rp
itemAfArr.append(itemAf)
if len(itemAfArr) > 0:
getEmailSameLen(itemAfArr, emArr, auMap, emals)
elif len(emArr) == len(afArr):
# print('EM == AF')
getEmailSameLen(afArr, emArr, auMap, emals)
else:
afe = abs(len(afArr) - len(emArr))
rpe = abs(len(rpArr) - len(emArr))
if afe > rpe:
# print('EM 靠近 RP')
itemAfArr = []
for i in range(len(rpArr)):
rp = rpArr[i]
if rp != '':
if rp in afMap.keys():
itemAf = afMap[rp]
else:
itemAf = rp
itemAfArr.append(itemAf)
if len(itemAfArr) > 0:
getEmailSameLen(itemAfArr, emArr, auMap, emals)
else:
# print('EM 靠近 AF')
getEmailSameLen(afArr, emArr, auMap, emals)
return emals
def getEmailSameLen(afArr, emArr, auMap, emals):
"""
获取名字长度和邮箱长度一样的名字对应的邮箱
:param afArr: 姓名全称集
:param emArr: 邮箱集
:param auMap: 姓名简称字典
:param emals: 邮箱字典 {'Li,Yu':'ly5566@126.com'}
:return:
"""
itemAfArr = afArr.copy()
for af in afArr:
em = getRealEmail(af, emArr, auMap)
if em != '' and em not in emals.keys():
emals[af] = em
itemAfArr.remove(af)
# 没有匹配,按顺序写
if len(itemAfArr) > 0 and len(emArr) >= len(itemAfArr):
for i in range(len(itemAfArr)):
em = emArr[i]
af = itemAfArr[i]
emals[af] = em
return emals
def getRealEmail(af, emArr, auMap):
"""
获取真实email
:param af:
:param emArr:
:param auMap:
:return:
"""
if af in auMap.keys():
au = auMap[af]
else:
au = af
arr1 = au.strip().split(',')
firstName = arr1[0].strip().upper()
auf = firstName[0: 1]
for em in emArr:
emAccount = em.split('@')[0] # email账号
emAccount = emAccount.upper() # 转大写
if emAccount.find(af) >= 0 or emAccount.find(au) >= 0 or emAccount.find(firstName) >= 0:
# 1、邮箱账号包含名字缩写
emArr.remove(em)
return em
if emAccount.startswith(auf) or emAccount.endswith(auf):
# 2、邮箱账号首位名字首字母
emArr.remove(em)
return em
# 若没有匹配的,返回空字符串
return ''
def getOutPutArr(guid, c1, auMap, afMap, emals, keywords, subjects, journals, afArr):
"""
切分C1,并组装输出作者数据集
\n Organization:从C1中获取,如[Wen, Bei] Chinese Acad Sci, Res Ctr Ecoenvironm Sci, Peoples R China; 取 Chinese Acad Sci
\n Second Organization:从C1中获取一级机构逗号之间的,如如[Wen, Bei] Chinese Acad Sci,Res Ctr Ecoenvironm Sci,Peoples R China; 取 Res Ctr Ecoenvironm Sci
\n Country:C1里机构的最后一个逗号后面里的字符串,[Wen, Bei] Chinese Acad Sci, Res Ctr Ecoenvironm Sci,Peoples R China; 取 Peoples R China
\n Organizationdept:完整机构,如[Wen, Bei] Chinese Acad Sci, Res Ctr Ecoenvironm Sci,Peoples R China; 取 Chinese Acad Sci, Res Ctr Ecoenvironm Sci,Peoples R China
"""
res = []
itemAfMap = {}
c1 = strRemoveSpecialSymbols(c1)
# C1中有作者的
strArr1 = c1.split('[')
for str1 in strArr1:
str1 = strRemoveSpecialSymbols(str1)
organizationdept = ''
organization = ''
secondOrganization = ''
country = ''
# afArr = []
strArr2 = str1.split(';')
for str2 in strArr2:
str2 = str2.strip()
if (str2.find(']') >= 0):
strArr3 = str2.split(']', 1)
name = strArr3[0].strip()
if name in afMap.keys():
af = afMap[name]
else:
af = name
if af not in itemAfMap.keys():
itemAfMap[af] = {}
# afArr.append(af)
organizationdept = strArr3[1].strip()
strArr4 = organizationdept.split(',')
organization = strArr4[0].strip()
if len(strArr4) > 1:
secondOrganization = strArr4[1].strip()
else:
secondOrganization = ''
if len(strArr4) > 2:
country = strArr4[len(strArr4) - 1].strip()
else:
country = ''
else:
if str2 in afMap.keys():
af = afMap[str2]
if af not in itemAfMap.keys():
itemAfMap[af] = {}
# afArr.append(af)
# afArr = strArrRemovalDuplicates(afArr)
for itemAf in itemAfMap.keys():
item = itemAfMap[itemAf]
item['guid'] = guid
item['name'] = itemAf
# item['organization'] = organization
item = appendValue(item, 'organization', organization)
# item['secondOrganization'] = secondOrganization
item = appendValue(item, 'secondOrganization', secondOrganization)
if itemAf in emals.keys():
# item['email'] = emals[itemAf]
item = appendValue(item, 'email', emals[itemAf])
else:
item['email'] = ''
if itemAf in auMap.keys():
item['altname'] = auMap[itemAf]
else:
item['altname'] = itemAf
# item['country'] = country
item = appendValue(item, 'country', country)
# item['organizationdept'] = organizationdept
item = appendValue(item, 'organizationdept', organizationdept)
item['keywords'] = keywords
item['subjects'] = subjects
item['journals'] = journals
# res.append(item)
for af in afArr:
if af in itemAfMap.keys():
resItem = itemAfMap[af]
else:
resItem = {}
resItem['guid'] = guid
resItem['name'] = af
resItem['organization'] = ''
resItem['secondOrganization'] = ''
if af in emals.keys():
resItem['email'] = emals[af]
else:
resItem['email'] = ''
if af in auMap.keys():
resItem['altname'] = auMap[af]
else:
resItem['altname'] = af
resItem['country'] = ''
resItem['organizationdept'] = ''
resItem['keywords'] = keywords
resItem['subjects'] = subjects
resItem['journals'] = journals
res.append(resItem)
return res
def appendValue(obj, key, value):
value = value.strip()
if key in obj.keys():
oldValue = obj[key]
oldArr = oldValue.split(";")
if value != '' and value not in oldArr:
obj[key] = oldValue + ";" + value
else:
obj[key] = value
return obj
def getOutPutArrMissing(guid, c1, auMap, emals, keywords, subjects, journals, srcAfArr):
"""
切分C1,并组装输出作者数据集
\n Organization:从C1中获取,如[Wen, Bei] Chinese Acad Sci, Res Ctr Ecoenvironm Sci, Peoples R China; 取 Chinese Acad Sci
\n Second Organization:从C1中获取一级机构逗号之间的,如如[Wen, Bei] Chinese Acad Sci,Res Ctr Ecoenvironm Sci,Peoples R China; 取 Res Ctr Ecoenvironm Sci
\n Country:C1里机构的最后一个逗号后面里的字符串,[Wen, Bei] Chinese Acad Sci, Res Ctr Ecoenvironm Sci,Peoples R China; 取 Peoples R China
\n Organizationdept:完整机构,如[Wen, Bei] Chinese Acad Sci, Res Ctr Ecoenvironm Sci,Peoples R China; 取 Chinese Acad Sci, Res Ctr Ecoenvironm Sci,Peoples R China
"""
res = []
c1 = strRemoveSpecialSymbols(c1)
# C1中没有作者的
organization = ''
secondOrganization = ''
country = ''
organizationdept = ''
arr1 = c1.split(";")
if len(arr1) > 0:
organizationMap = {}
secondOrganizationMap = {}
countryMap = {}
organizationdeptMap = {}
for item1 in arr1:
organizationdeptMap[item1] = 1
arr2 = item1.split(",")
if len(arr2) > 0:
organizationMap[arr2[0]] = 1
elif len(arr2) > 1:
secondOrganizationMap[arr2[1]] = 1
elif len(arr2) > 2:
countryMap[arr2[len(arr2) - 1]] = 1
if len(organizationMap.keys()) > 0:
organization = ";".join(organizationMap.keys())
if len(secondOrganizationMap.keys()) > 0:
secondOrganization = ";".join(secondOrganizationMap.keys())
if len(countryMap.keys()) > 0:
country = ";".join(countryMap.keys())
if len(organizationdeptMap.keys()) > 0:
organizationdept = ";".join(organizationdeptMap.keys())
for itemAf in srcAfArr:
item = {}
item['guid'] = guid
item['name'] = itemAf
item['organization'] = organization
item['secondOrganization'] = secondOrganization
if itemAf in emals.keys():
item['email'] = emals[itemAf]
else:
item['email'] = ''
if itemAf in auMap.keys():
item['altname'] = auMap[itemAf]
else:
item['altname'] = itemAf
item['country'] = country
item['organizationdept'] = organizationdept
item['keywords'] = keywords
item['subjects'] = subjects
item['journals'] = journals
res.append(item)
return res
def transform(client):
start = 0 # 开始下标
rows = 10000 # 每次处理数据总量
flag = 1 # 退出循环的标志
while (flag > 0):
sql = (
" select guid,EM,AF,RP,AU,C1,DE,SC,SO from " + China +
" where guid not in (SELECT guid from " + OutPut + ")" +
" and AF is not null and AF != '' " +
" limit " + str(start) + "," + str(rows)
)
list = client.execute(sql)
print("transform", len(list), " limit " + str(start) + "," + str(rows))
if len(list) == 0:
flag = 0
else:
outPutArr = []
for item in list:
# 字段获取
guid = str(item[0] or '')
EM = str(item[1] or '')
AF = str(item[2] or '')
RP = str(item[3] or '')
AU = str(item[4] or '')
C1 = str(item[5] or '')
DE = str(item[6] or '')
SC = str(item[7] or '')
SO = str(item[8] or '')
# 切分通讯作者
rpArr = []
if RP != '':
rpArr = splitRP(RP)
# 切分姓名全称
afArr = []
if AF != '':
afArr = AF.split(';')
afArr = strArrRemovalDuplicates(afArr)
# 切分姓名简称
auArr = []
if AU != '':
auArr = AU.split(';')
auArr = strArrRemovalDuplicates(auArr)
# 切分邮箱
emArr = []
if EM != '':
emArr = EM.split(";")
emArr = strArrRemovalDuplicates(emArr)
# 切分DE,去重,以分号隔开
keywords = ''
if DE != '':
deArr = DE.split(';')
deArr = strArrRemovalDuplicates(deArr)
keywords = ';'.join(deArr)
# 切分SC,去重,以分号隔开
subjects = ''
if SC != '':
scArr = SC.split(';')
scArr = strArrRemovalDuplicates(scArr)
subjects = ';'.join(scArr)
# 切分SO,去重,以分号隔开
journals = ''
if SO != '':
soArr = SO.split(';')
soArr = strArrRemovalDuplicates(soArr)
journals = ';'.join(soArr)
# 姓名全称字典
afMap = {}
# 姓名简称字典
auMap = {}
if len(afArr) != 0 and len(auArr) != 0:
afMap = getNameMap(afArr, auArr)
auMap = getNameMap(auArr, afArr)
# 获取email和af的关系
emals = {}
if len(emArr) != 0:
emals = getEmails(afArr, rpArr, emArr, auMap, afMap)
# 切分C1,并组装输出作者数据集
if C1.find("]") > 0 or C1.find("[") > 0:
itemOutPutArr = getOutPutArr(guid, C1, auMap, afMap, emals, keywords, subjects, journals, afArr)
else:
itemOutPutArr = getOutPutArrMissing(guid, C1, auMap, emals, keywords, subjects, journals, afArr)
# 保存输出结果至临时表
outPutArr.extend(itemOutPutArr)
if (len(outPutArr) > 0):
print("insert", len(outPutArr))
sql = (
"INSERT INTO " + OutPut + " (" +
"guid,name,organization,Second_organization,email,altname,country,organizationdept,keywords,subjects,journals" +
")VALUES")
insertNum = 0
for i in range(len(outPutArr)):
insertNum += 1
item = outPutArr[i]
if item['email'] != '':
emailSql = getDbStr(escapeCharacter(str(item['email'] or ''))) + ","
else:
emailSql = "null,"
itemSql = (
"(" +
getDbStr(escapeCharacter(str(item['guid'] or ''))) + "," +
getDbStr(escapeCharacter(lowercaseAndRemoveSymbol(str(item['name'] or '')))) + "," +
getDbStr(escapeCharacter(str(item['organization'] or ''))) + "," +
getDbStr(escapeCharacter(str(item['secondOrganization'] or ''))) + "," +
emailSql +
getDbStr(escapeCharacter(str(item['altname'] or ''))) + "," +
getDbStr(escapeCharacter(str(item['country'] or ''))) + "," +
getDbStr(escapeCharacter(str(item['organizationdept'] or ''))) + "," +
getDbStr(escapeCharacter(str(item['keywords'] or ''))) + "," +
getDbStr(escapeCharacter(str(item['subjects'] or ''))) + "," +
getDbStr(escapeCharacter(str(item['journals'] or ''))) +
"),"
)
sql = sql + itemSql
# 每 1000 条数据insert一次
if insertNum >= 1000:
# print('insert ', insertNum, ' 条数据')
if (sql.endswith(',')):
sql = sql[0:len(sql) - 1]
if (sql.endswith(')')):
client.execute(sql)
sql = (
"INSERT INTO " + OutPut + " (" +
"guid,`name`,organization,Second_organization,email,altname,country,organizationdept,keywords,subjects,journals" +
")VALUES")
insertNum = 0
# 最后不足 1000 条的数据insert
if insertNum > 0:
if (sql.endswith(',')):
sql = sql[0:len(sql) - 1]
if (sql.endswith(')')):
print('insert last', insertNum, ' 条数据')
client.execute(sql)
start += rows
def startTransform(client):
flag = 1 # 退出循环的标志
while (flag > 0):
sql = (
" select count() from (" +
" select guid,EM,AF,RP,AU,C1,DE,SC,SO from " + China +
" where guid not in (SELECT guid from " + OutPut + ")" +
" and AF is not null and AF != '' " +
" )"
)
sql_rows = client.execute(sql)
count = sql_rows[0][0]
print(count)
if count == 0:
flag = 0
else:
# 数据转换
transform(client)
def getClient():
config = constant.dbConfig
client = Client(host=config['host'], database=config['db'], user=config['user'], password=config['pwd'],
send_receive_timeout=config['timeout'])
return client
def main():
client = getClient()
# 初始化表
initTable_step1(client)
# 开始数据转换
startTransform(client)
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/tf001/ZjdxCKDataHandelPyThon.git
git@gitee.com:tf001/ZjdxCKDataHandelPyThon.git
tf001
ZjdxCKDataHandelPyThon
ZjdxCKDataHandelPyThon
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891