代码拉取完成,页面将自动刷新
import configparser
import json
import os.path
import subprocess
import sys
from zoo_lib.FileHelper import SaveStingArrayIntoFile
import requests
from lxml import etree
strCrashUrl = ""
boolOutputLog = False
########################################################################################################################
def IsANumber(x):
if x == '0' or x == '1' or x == '2' or x == '3' or x == '4' or x == '5' or x == '6' or x == '7' or x == '8' or x == '9' or x == 'a' or x == 'b' or x == 'c' or x == 'd' or x == 'e' or x == 'f' or x == 'A' or x == 'B' or x == 'C' or x == 'D' or x == 'E' or x == 'F':
return True
pass
return False
# 判断当前字符串是否是一个十六进制数字
def IsNumber(x):
x = x.lower()
if len(x) >= 3:
if x.startswith("0x"):
x = x[2:]
for t in x:
if not IsANumber(t):
return False
return True
########################################################################################################################
def OutputLogInfo(*args):
global boolOutputLog
if boolOutputLog:
print("LOG => ", *args)
def SaveStingIntoFile(info, save_file):
with open(save_file, "w") as f:
f.write(info)
f.close()
return save_file
def GetURLPath(config_dir):
if config_dir == "":
config_dir = os.getcwd()
confile = config_dir + "\\config.ini"
if not os.path.isfile(confile):
# 如果配置文件不存在,那么就创建一个,并且填入数据
save_info = '''[Path]
CRASH_URL=
'''
SaveStingIntoFile(save_info, confile)
print("下载服务器配置文件不存在,已经生成,需要填写URL")
return ""
else:
# 如果配置文件存在,那么就读配置文件
# 读取.ini文件
conf = configparser.ConfigParser()
conf.read(confile)
# get()函数读取section里的参数值
# 如果正确读出数据并且数据正常那么就加载
name = conf.get("Path", "CRASH_URL")
if name is not None and name != "":
return name
return ""
########################################################################################################################
def GetModuleArray_PROCESS(channel, data, process_name, module_name, page='1'):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_process_module_top/?page={3}&process_name={2}&product_id=1&tryno={0}&start_date={1}&end_date={1}&module_name="
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, process_name, page)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
parse_html = etree.HTML(html)
if parse_html is None:
return []
tr_list = parse_html.xpath('//tr')
if tr_list is None or len(tr_list) < 3:
return []
module_list = []
for tr in tr_list:
module = tr.xpath('./td[2]/a/text()')
if len(module) == 0:
continue
if module_name is not None and module_name != "" and module_name != module[0]:
continue
module_list.append(module[0])
return module_list
def GetModuleVersionArray_PROCESS(channel, data, process_name, module_name, page='1'):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_process_module_ver_address_top/?page={4}&module_name={3}&process_name={2}&product_id=1&tryno={0}&start_date={1}&end_date={1}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, process_name, module_name, page)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
parse_html = etree.HTML(html)
if parse_html is None:
return []
tr_list = parse_html.xpath('//tr')
if tr_list is None or len(tr_list) < 3:
return []
module_list = []
for tr in tr_list:
module_version = tr.xpath('./td[3]/text()')
module_offset = tr.xpath('./td[4]/text()')
if len(module_version) == 0:
continue
if len(module_offset) == 0:
continue
module_key = module_version[0] + '|' + module_offset[0]
if module_key in module_list:
continue
module_list.append(module_key)
return module_list
def GetDumpURLAddress_PROCESS(channel, data, process, address, module, version):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_file_list/?crash_date={1}&product_id=1&process_name={5}&module_name={3}&module_ver={4}&crash_address={2}&dump_type=1&tryno={0}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, address, module, version, process)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
if '[' not in html:
return []
address_object = json.loads(html)
if len(address_object) == 0:
return []
address_list = []
for one_object in address_object:
if 'file_path' not in one_object.keys():
continue
address_list.append(one_object['file_path'])
return address_list
def GetCrash_PROCESS(channel, data, save_path, process, module, version):
module_version = version.split('|')[0]
module_offset = version.split('|')[1]
return_list = GetDumpURLAddress_PROCESS(channel, data, process, module_offset, module, module_version)
# 根据偏移取下载地址
if len(return_list) == 0:
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_file_list/?crash_date={1}&product_id=1&process_name={5}&module_name={3}&module_ver={4}&crash_address={2}&dump_type=1&tryno={0}"
url = url.format(channel, data, module_offset, module, module_version, process)
print("\t download error , url [" + url + "]")
return return_list
def RunDumpAnalyze_PROCESS(crash_type, channel_id, day_time, dir_path, path_wget="", process_name="", module_name="", only_list = False):
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
if not dir_path.endswith('/') and not dir_path.endswith('\\'):
dir_path = dir_path + '/'
if not os.path.isdir(dir_path + "out/"):
os.makedirs(dir_path + "out/")
if module_name is None or module_name == "":
OutputLogInfo("开始获取进程【", process_name, "】下的所有模块")
else:
OutputLogInfo("开始获取进程【", process_name, "】下的模块【", module_name , "】")
module_list = []
for i in range(1, 10):
modules = GetModuleArray_PROCESS(channel_id, day_time, process_name, module_name, str(i))
for module in modules:
if not module in module_list:
module_list.append(module)
OutputLogInfo("找到模块数量:", len(module_list))
local_file = []
if len(module_list) == 0:
return local_file
else:
# 取一个 module
for module_name_in_list in module_list:
OutputLogInfo("开始获取进程【", process_name, "】下模块【", module_name_in_list, "】的版本信息和偏移地址")
module_version_list = []
# 取对应module 的版本
for i in range(1, 10):
modules = GetModuleVersionArray_PROCESS(channel_id, day_time, process_name, module_name_in_list, str(i))
for module in modules:
if not module in module_version_list:
module_version_list.append(module)
OutputLogInfo("开始获取进程【", process_name, "】下模块【", module_name_in_list, "】的相关dump下载地址")
# 根据 process 、 module 、module version 获取对应的dump文件地址
for module_version_in_list in module_version_list:
file_urls = GetCrash_PROCESS(channel_id, day_time, dir_path, process_name, module_name_in_list, module_version_in_list)
local_file.extend(file_urls)
OutputLogInfo("共获取到进程【", process_name, "】下模块【", module_name , "】的dump数量【", len(local_file) , "】,开始下载")
if len(local_file) != 0:
# 下载所有文件
local_file = DownloadAddress(local_file, dir_path, path_wget, only_list)
print("")
print("")
print("channel :", channel_id)
print("day :", day_time)
print("process :", process_name)
if module_name is None or module_name == '':
print("module :", "[All Module dump]")
else:
print("module :", module_name)
print("path :", dir_path)
print("dump count :", len(local_file))
return local_file
########################################################################################################################
def GetModuleArray_MODULE(channel, data, module_name, page='1'):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_module_ver_top/?product_id=1&tryno={0}&start_date={1}&end_date={1}&module_name={2}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, module_name)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
parse_html = etree.HTML(html)
if parse_html is None:
return []
tr_list = parse_html.xpath('//tr')
if tr_list is None or len(tr_list) < 3:
return []
module_list = []
for tr in tr_list:
module_name = tr.xpath('./td[2]/a/text()')
if len(module_name) == 0:
continue
module_list.append(module_name[0])
return module_list
def GetOffsetArray_MODULE(channel, data, page, module, version):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_module_ver_address_top/?page={4}&module_ver={3}&module_name={2}&product_id=1&tryno={0}&start_date={1}&end_date={1}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, module, version, page)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
parse_html = etree.HTML(html)
if parse_html is None:
return []
tr_list = parse_html.xpath('//tr')
if tr_list is None or len(tr_list) < 3:
return []
offset_list = []
for tr in tr_list[2:]:
number_list = tr.xpath('./td[3]/text()')
if len(number_list) == 0:
continue
string_number = tr.xpath('./td[3]/text()')[0]
if IsNumber(string_number):
offset_list.append(string_number)
return offset_list
def GetDumpURLAddress_MODULE(channel, data, address, module, version):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_file_list/?crash_date={1}&product_id=1&module_name={3}&module_ver={4}&crash_address={2}&dump_type=1&tryno={0}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, address, module, version)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
if '[' not in html:
return []
address_object = json.loads(html)
if len(address_object) == 0:
return []
address_list = []
for one_object in address_object:
if 'file_path' not in one_object.keys():
continue
address_list.append(one_object['file_path'])
return address_list
def GetCrash_MODULE(channel, data, save_path, module, version):
# 先取所有偏移
offset_list = []
# 可能多页保存,所以最多支持10页
for index in range(1, 10):
string_index = str(index)
return_list = GetOffsetArray_MODULE(channel, data, string_index, module, version)
if len(return_list) == 0:
break
for offset in return_list:
if offset in offset_list:
pass
else:
offset_list.append(offset)
if len(offset_list) == 0:
return []
# 根据偏移取下载地址
address_list = []
for address_offset in offset_list:
return_list = []
# 下载可能失败,所以循环10次
for index in range(0, 10):
return_list = GetDumpURLAddress_MODULE(channel, data, address_offset, module, version)
if len(return_list) != 0:
break
pass
print("module :", module, ", address_offset :", address_offset, ", count :", len(return_list))
if len(return_list) == 0:
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_file_list/?crash_date={1}&product_id=1&module_name={3}&module_ver={4}&crash_address={2}&dump_type=1&tryno={0}"
url = url.format(channel, data, address_offset, module, version)
print("\t download error , url [" + url + "]")
continue
address_list.extend(return_list)
if len(address_list) == 0:
return []
return address_list
def RunDumpAnalyze_MODULE(crash_type, channel_id, day_time, dir_path, path_wget="", process_name="", module_name="", only_list = False):
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
if not dir_path.endswith('/') and not dir_path.endswith('\\'):
dir_path = dir_path + '/'
if not os.path.isdir(dir_path + "out/"):
os.makedirs(dir_path + "out/")
module_list = GetModuleArray_MODULE(channel_id, day_time, module_name)
local_file = []
if len(module_list) == 0:
return local_file
else:
for module_version in module_list:
file_urls = GetCrash_MODULE(channel_id, day_time, dir_path, module_name, module_version)
local_file.extend(file_urls)
if len(local_file) != 0:
# 下载所有文件
local_file = DownloadAddress(local_file, dir_path, path_wget, only_list)
print("")
print("")
print("channel :", channel_id)
print("day :", day_time)
print("path :", dir_path)
print("dump count :", len(local_file))
return local_file
########################################################################################################################
def GetModuleArray_BSOD(channel, data, page='1'):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_process_module_top/?process_name=bsod&product_id=1&tryno={0}&start_date={1}&end_date={1}&module_name="
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
parse_html = etree.HTML(html)
if parse_html is None:
return []
tr_list = parse_html.xpath('//tr')
if tr_list is None or len(tr_list) < 3:
return []
module_list = []
for tr in tr_list:
module_name = tr.xpath('./td[2]/a/text()')
if len(module_name) == 0:
continue
module_list.append(module_name[0])
return module_list
def GetOffsetArray_BSOD(channel, data, page='1', module='nt'):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_process_module_ver_address_top/?page={2}&process_name=bsod&module_name={3}&product_id=1&tryno={0}&start_date={1}&end_date={1}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, page, module)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
parse_html = etree.HTML(html)
if parse_html is None:
return []
tr_list = parse_html.xpath('//tr')
if tr_list is None or len(tr_list) < 3:
return []
offset_list = []
for tr in tr_list[2:]:
number_list = tr.xpath('./td[4]/text()')
if len(number_list) == 0:
continue
string_number = tr.xpath('./td[4]/text()')[0]
if IsNumber(string_number):
offset_list.append(string_number)
return offset_list
def GetDumpURLAddress_BSOD(channel, data, address, module='nt'):
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_file_list/?crash_date={1}&product_id=1&process_name=bsod&module_name={3}&crash_address={2}&dump_type=1&tryno={0}"
headers = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"}
url = url.format(channel, data, address, module)
html = ""
try:
if html == "":
html = requests.get(url=url, headers=headers, timeout=60).content.decode('utf-8', 'ignore')
except:
html = ""
if html is None or html == "":
return []
if '[' not in html:
return []
address_object = json.loads(html)
if len(address_object) == 0:
return []
address_list = []
for one_object in address_object:
if 'file_path' not in one_object.keys():
continue
address_list.append(one_object['file_path'])
return address_list
def GetCrash_BSOD(channel, data, save_path, module='nt'):
# 先取所有偏移
offset_list = []
# 可能多页保存,所以最多支持10页
for index in range(1, 10):
string_index = str(index)
return_list = GetOffsetArray_BSOD(channel, data, string_index, module)
if len(return_list) == 0:
break
for offset in return_list:
if offset in offset_list:
pass
else:
offset_list.append(offset)
if len(offset_list) == 0:
return []
# 根据偏移取下载地址
address_list = []
for address_offset in offset_list:
return_list = []
# 下载可能失败,所以循环10次
for index in range(0, 10):
return_list = GetDumpURLAddress_BSOD(channel, data, address_offset, module)
if len(return_list) != 0:
break
pass
print("module :", module, ", address_offset :", address_offset, ", count :", len(return_list))
if len(return_list) == 0:
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_file_list/?crash_date={1}&product_id=1&process_name=bsod&module_name={3}&crash_address={2}&dump_type=1&tryno={0}"
url = url.format(channel, data, address_offset, module)
print("\t download error , url [" + url + "]")
continue
address_list.extend(return_list)
if len(address_list) == 0:
return []
return address_list
def RunDumpAnalyze_BSOD(crash_type, channel_id, day_time, dir_path, path_wget="", process_name="", module_name="", only_list = False):
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
if not dir_path.endswith('/') and not dir_path.endswith('\\'):
dir_path = dir_path + '/'
if not os.path.isdir(dir_path + "out/"):
os.makedirs(dir_path + "out/")
module_list = GetModuleArray_BSOD(channel_id, day_time)
local_file = []
if len(module_list) == 0:
# 这里是异常处理
# local_file = GetBSOD('1339', '2021-11-09', "D:\\dump\\2021-11-09\\")
local_file = GetCrash_BSOD(channel_id, day_time, dir_path, 'nt')
else:
for module_version in module_list:
file_urls = GetCrash_BSOD(channel_id, day_time, dir_path, module_version)
local_file.extend(file_urls)
if len(local_file) != 0:
# 下载所有文件
local_file = DownloadAddress(local_file, dir_path, path_wget, only_list)
print("")
print("")
print("channel :", channel_id)
print("day :", day_time)
print("path :", dir_path)
print("dump count :", len(local_file))
return local_file
########################################################################################################################
def RunDumpAnalyze(crash_type, channel_id, day_time, dir_path, path_wget="", process_name="", module_name="", only_list = False, output_log=False):
global strCrashUrl
strCrashUrl = GetURLPath(os.path.dirname(os.path.abspath(__file__)))
if strCrashUrl == "":
return
global boolOutputLog
boolOutputLog = output_log
if crash_type == "bsod":
return RunDumpAnalyze_BSOD(crash_type, channel_id, day_time, dir_path, path_wget, process_name, module_name, only_list)
elif crash_type == "module":
return RunDumpAnalyze_MODULE(crash_type, channel_id, day_time, dir_path, path_wget, process_name, module_name, only_list)
elif crash_type == "process":
return RunDumpAnalyze_PROCESS(crash_type, channel_id, day_time, dir_path, path_wget, process_name, module_name, only_list)
else:
return []
def DownloadAddress(file_list, save_path, path_wget="", only_list = False):
if only_list :
if save_path.endswith('\\') or save_path.endswith('/'):
save_path = save_path[:len(save_path) - 1] + ".download_list.txt"
SaveStingArrayIntoFile(file_list, save_path, "\n")
return []
else:
local_file = []
global strCrashUrl
url = strCrashUrl + "crash_stat/dump_download_file/?filename="
for file_path in file_list:
file_name = file_path.split('/')[-1]
save_file = save_path + file_name
if os.path.isfile(save_file):
print("File Already:", save_file)
continue
download_file = url + file_path
if path_wget == "":
path_wget = "wget64.exe"
cmd_line = path_wget + " " + download_file + " -O " + save_file
child = subprocess.Popen(cmd_line)
child.wait()
if os.path.isfile(save_file):
local_file.append(save_file)
print("File Success:", save_file)
else:
print("File Error :", save_file)
return local_file
def RunDebugAnalyze(debug_dir, dump_dir, out_dir):
if not debug_dir.endswith('/') and not debug_dir.endswith('\\'):
debug_dir = debug_dir + '\\'
cmd_line = "python3.exe " + debug_dir + "DebugMain.py " + " --muldump --analyze" + " -d " + dump_dir + " --outdir " + out_dir
child = subprocess.Popen(cmd_line, cwd=debug_dir)
child.wait()
########################################################################################################################
"""
def MainDownloader():
pass
def Main():
if len(sys.argv) != 4 and len(sys.argv) != 5 and len(sys.argv) != 6:
print("help:")
print("\t\tget channel day path")
print("\t\tchannel 要下载的对应渠道号")
print("\t\tday 要下载的对应日期")
print("\t\tpath 存放的目标路径(如果不存在会循环创建)")
print("")
print("\t\t例子: get 1339 2021-11-09 D:\\dump\\2021-11-09")
print("")
print("特殊调用方式:")
print("\t\t1: 直接调用目标调试脚本来做基本调试")
print("\t\t\tget dbg debugger_dir dump_dir out_dir")
print("\t\t\t例子: get dbg D:\\Python\\Python3-debug-with-WinDBG D:\\dump\\2021-11-09 D:\\dump\\2021-11-09\\out")
print("\t\t2: 执行完整流程,从下载,到调试")
print("\t\t\tget all channel day dump_dir debugger_dir")
print("\t\t\t会在dump 目录下自动创建对应日期的目录,在日期目录内自动创建检测结果输出目录")
print("\t\t\t例子: get all 1339 2021-11-09 D:\\dump\\ D:\\Python\\Python3-debug-with-WinDBG\\")
pass
else:
if len(sys.argv) == 4:
channel_id = sys.argv[1]
day_time = sys.argv[2]
dir_path = sys.argv[3]
if not dir_path.endswith('/') and not dir_path.endswith('\\'):
dir_path = dir_path + '/'
if not dir_path.endswith(day_time + '/'):
dir_path = dir_path + day_time + '/'
RunDumpAnalyze(channel_id, day_time, dir_path)
elif len(sys.argv) == 5:
if sys.argv[1] == 'dbg':
debug_dir = sys.argv[2]
dump_dir = sys.argv[3]
out_dir = sys.argv[4]
RunDebugAnalyze(debug_dir, dump_dir, out_dir)
elif len(sys.argv) == 6:
if sys.argv[1] == 'all':
channel_id = sys.argv[2]
day_time = sys.argv[3]
dir_path = sys.argv[4]
if not dir_path.endswith('/') and not dir_path.endswith('\\'):
dir_path = dir_path + '/'
if not dir_path.endswith(day_time + '/'):
dir_path = dir_path + day_time + '/'
local_file = RunDumpAnalyze(channel_id, day_time, dir_path)
if len(local_file) != 0:
debug_dir = sys.argv[5]
out_dir = dir_path + 'out/'
RunDebugAnalyze(debug_dir, dir_path, out_dir)
# 按间距中的绿色按钮以运行脚本。
if __name__ == '__main__':
Main()
pass
"""
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。