1 Star 0 Fork 0

oennn/code_search_analysis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
pracSearchInPy_v1.03_upload.py 8.74 KB
一键复制 编辑 原始数据 按行查看 历史
oennn 提交于 9个月前 . 上传python代码
# 3大功能
# 搜索文件夹和文件名称
# (vscode是comman + P)
# 独特功能:可以选择多个文件夹,添加子文件夹排除条件
# 搜索文件内容
# (vscode command + shift + F)
# 独特功能: 可以检索多个正则表达式,然后先匹配上的作为变量向下复制,同一行,或者同文件,
# 但是因为没有结束匹配。所以开头和结尾的地方会出现数据不匹配。如果能添加一个结束匹配的正则表达式。那么默认是文件内作为变量
# 提取表格中的列的内容,然后命中分类
# 独特功能:同一行抽取多个特征,一个正则表达式提取多个token
# 独特功能:多个正则表达式的命中,同时分类
# 同一行多个命中,分成多行。
# 配置常量
import datetime
from datetime import date
from operator import concat
import os
import asyncio
from asyncio import Lock
import pandas as pd
import numpy as np
import re
def getChildFiles(basePath):
return [f for f in os.listdir(basePath) if os.path.isfile(basePath + f)]
def getChildFolders(basePath):
return [f for f in os.listdir(basePath) if os.path.isdir(basePath + f)]
def getChildFolders_filter(basePath):
return [f for f in os.listdir(basePath) if (os.path.isdir(basePath + f) and not multiMatch(f,excFolderType)) ]
isFirstExcelOutput = True
# mac的设置里面一旦访问过了,就会有允许和不允许,下面的是可移除卷宗,然后网络卷宗现在vscode是没有勾选上
# async def使用方法
# https://superfastpython.com/asyncio-async-def/
# https://docs.python.org/3/library/index.html
# 正则表达式 中文例子 https://www.jb51.net/article/177521.htm
# https://blog.csdn.net/weixin_40907382/article/details/79654372
# 官网 正则表达式 https://docs.python.org/3/library/re.html
async def writeToFile(filout, finalStrArr, lock: Lock, oneFileData: pd.DataFrame):
async with lock:
oneFileData.to_csv(filout, sep='\t', index=False, header=None)
# 不写到excel文件了。因为excel文件不知道什么位置是文件末尾。没办法append。
# 如果要append,需要用到pd.ExcelWriter mode=append 然后sheet名称,开始的行数是maxrow
## modi
return # 内容多的时候,excel写的太慢了,
file_path = searchFilePath
global isFirstExcelOutput
oneFileData = oneFileData.fillna(" ")
if isFirstExcelOutput:
oneFileData.index.name = "No"
oneFileData.columns.name = "No2" # 这个设置了好像就显示不出来了。
oneFileData.index = oneFileData.index + 1
multHd = []
multHd.append((t_lineNo,""))
resultNoCnt = 1
for kw in searchKwsArr:
multHd.append((t_result_tmp+str(resultNoCnt),kw))
resultNoCnt+=1
multHd.append((t_hitNos,""))
multHd.append((t_hitKws,""))
multHd.append((t_filePath,""))
multHd.append((t_fileName,""))
multHd.append((t_lineContent,""))
oneFileData.columns = pd.MultiIndex.from_tuples(multHd,names=["titles","keywords"])
oneFileData.to_excel(file_path)
isFirstExcelOutput = False
# else:
with pd.ExcelWriter(file_path, mode='a', if_sheet_exists='overlay') as writer:
oneFileData.index = oneFileData.index + 1 - 1 + writer.sheets['Sheet1'].max_row
oneFileData.to_excel(writer, sheet_name='Sheet1', startrow=writer.sheets['Sheet1'].max_row, header=None)
def multiMatch(content, kwsArr):
for kw in kwsArr:
if re.match(kw, content):
return True
return False
# modi start 配置参数都在这里
searchFilePath = '/XXX/output/test01_mybatis_src3.xlsx' # 输出的excel
searchtxtfileoutPath = "/XXXX/output/"+"output_springframe_src3_speed2.txt" # 输出的txt
searchBasePaths = ['/XXXXXX/sourcecode-project/spring-framework-main/'] # 检索的文件夹
excFolderType = [
r"test"
, r"mock"
, r"kotlin"
]
excFileType = [
r"^\._.*"
# ,r".*\.xls.*"
# ,r".*\.log\..*"
# ,r".*\.md"
]
incFileType = [
# r"^[^\.]+\.[^\.]+"
r".*\.java"
]
searchKwsArr = [
# spring-framework
r"bean[^\. ]*[\. ]"
, r"[\. ][^\. ]*proxy"
, r"servlet"
, r"template"
, r"session"
, r"factory"
, r"manager"
, r"provider"
, r"interceptor"
# mybatis
# r"sqlsession"
# , r"mapp"
# , r"statement"
# redis
# r"skiplist"
# ,r"ziplist"
# ,r"hashtable"
# ,r"linkedlist"
# ,r"intset"
# ,r"embstr"
# ,r"quicklist"
# ,r"listpack"
# kafka
# r"zookeeper",
# r"bootstrap",
# r"topic",
# r"min",
# r"lag"
]
# modi end
t_lineNo="lineNo"
t_result_tmp="result"
t_hitNos="hitNos"
t_hitKws="hitKws"
t_lineContent="lineContent"
t_fileName="fileName"
t_filePath="filePath"
async def searchInFile(f, basePath, filout, lock: Lock):
print("filename: " + f)
if not multiMatch(f,excFileType) and multiMatch(f,incFileType):
col_title=[t_lineNo]
resultNoCnt = 1
for kw in searchKwsArr:
col_title.append(t_result_tmp+str(resultNoCnt))
resultNoCnt+=1
col_title.append(t_hitNos)
col_title.append(t_hitKws)
col_title.append(t_filePath)
col_title.append(t_fileName)
col_title.append(t_lineContent)
with open(basePath + f, "r") as file:
one_file_result = pd.DataFrame(columns=
col_title)
finalStrArr = []
linNo = 0
lines = file.readlines()
for line in lines:
linNo += 1
ptStrs = list()
resultPD_key = pd.DataFrame(columns=col_title)
ptStrTmp = str(linNo) + "\t"
resultPD_tmp = pd.DataFrame(columns=col_title)
resultPD_tmp.loc[0,t_lineNo]=linNo
maxFnd = 0
hitKws = []
hitNos = []
kwsSeq = 0
for pp in searchKwsArr:
kwsSeq = kwsSeq + 1
lastFnd = "\t"
findCnt = 0
for m in re.finditer(
pp
, line
, flags=re.IGNORECASE):
findCnt += 1
if findCnt > maxFnd:
maxFnd = findCnt
ptStrs.append(ptStrTmp)
resultPD_key = pd.concat([resultPD_key,resultPD_tmp], ignore_index=True)
ptStrs[findCnt-1] = ptStrs[findCnt-1] + pp + ": " + m.group() + "\t"
resultPD_key.loc[findCnt-1,t_result_tmp+str(kwsSeq)] = m.group()
lastFnd = pp + ": " + m.group() + "\t"
hitNos.append(str(kwsSeq))
hitKws.append(pp)
if False:
ptStrTmp = ptStrTmp + lastFnd
else:
ptStrTmp = ptStrTmp + "\t"
# pd这里单个key搜索就不用填充了
notfnd = 0
for fnd in ptStrs:
notfnd += 1
if notfnd > findCnt:
ptStrs[notfnd-1] = ptStrs[notfnd-1] + "\t"
# 统计一行的命中结果
fndNo = 0
for fnd in ptStrs:
fndNo += 1
ptStrs[fndNo-1] = ptStrs[fndNo-1] + ";"+";".join(hitNos) +";"+ "\t" +";"+ ";".join(hitKws) +";"+ "\t"
# 这里是单行搜索,单行的多个结果拼接到一起
if maxFnd > 0:
finalStr = ""
for st in (ptStrs): finalStr = finalStr + st + line # + "\n"
finalStrArr.append(finalStr)
resultPD_key[t_hitNos]=";".join(hitNos)
resultPD_key[t_hitKws]="【"+"】;【".join(hitKws)+"】"
resultPD_key[t_filePath]=basePath
resultPD_key[t_fileName]=f
resultPD_key[t_lineContent]=line.replace("\n","").replace("\r","")
one_file_result = pd.concat([one_file_result,resultPD_key], ignore_index=True)
await asyncio.wait([asyncio.create_task(writeToFile(filout, finalStrArr, lock, one_file_result))])
async def searchInFolder(basePath, filout, lock: Lock):
tasklist = []
for fo in getChildFolders_filter(basePath):
tasklist.append(asyncio.create_task(searchInFolder(basePath + fo + "/", filout, lock))) # 原来缺了一个await导致第一层目录太快,程序直接结束了。
files = getChildFiles(basePath)
for f in files:
tasklist.append(asyncio.create_task(searchInFile(f, basePath, filout, lock)))
await asyncio.wait(tasklist)
async def main():
lock = Lock()
starttime =datetime.datetime.now()
basePaths = searchBasePaths
filout = open(searchtxtfileoutPath,"w")
filout.write("excFolderType:" + "\n")
filout.write("\t" + "\n\t".join(excFolderType) + "\n")
filout.write("excFileType:" + "\n")
filout.write("\t" + "\n\t".join(excFileType) + "\n")
filout.write("incFileType:" + "\n")
filout.write("\t" + "\n\t".join(incFileType) + "\n")
filout.write("searchKwsArr:" + "\n")
filout.write("\t" + "\n\t".join(searchKwsArr) + "\n")
filout.write("basePaths:" + "\n")
filout.write("\t" + "\n\t".join(basePaths) + "\n")
titleStr = t_lineNo + "\t"
titleStrDes = "\t"
resultNo = 1
for kw in searchKwsArr:
titleStr = titleStr + t_result_tmp + str(resultNo) + "\t"
titleStrDes = titleStrDes + kw + "\t"
resultNo = resultNo + 1
titleStr = titleStr + t_hitNos + "\t" + t_hitKws + "\t" + t_filePath + "\t" + t_fileName + "\t" + t_lineContent + "\t"
filout.write(titleStr + "\n")
filout.write(titleStrDes + "\n")
task_fol_list = []
for basePath in basePaths:
task_fol_list.append(asyncio.create_task(searchInFolder(basePath, filout, lock)))
await asyncio.wait(task_fol_list)
print('search complete!')
print("start" + str(starttime))
print("end " + str(datetime.datetime.now()))
if __name__ == "__main__":
asyncio.run(main())
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/oennn/code_search_analysis.git
git@gitee.com:oennn/code_search_analysis.git
oennn
code_search_analysis
code_search_analysis
master

搜索帮助