Ai
3 Star 6 Fork 0

Gitee 极速下载/viztracer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/gaogaotiantian/viztracer
克隆/下载
cmdline_tmpl.py 6.19 KB
一键复制 编辑 原始数据 按行查看 历史
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/gaogaotiantian/viztracer/blob/master/NOTICE.txt
import json
import logging
import os
import shutil
import subprocess
import sys
import textwrap
import time
from typing import Optional
from .base_tmpl import BaseTmpl
file_fib = """
def fib(n):
if n < 2:
return 1
return fib(n-1) + fib(n-2)
fib(5)
"""
class CmdlineTmpl(BaseTmpl):
def build_script(self, script, name="cmdline_test.py"):
with open(name, "w") as f:
f.write(textwrap.dedent(script))
def cleanup(self, output_file="result.json", script_name="cmdline_test.py"):
if os.path.exists(script_name):
os.remove(script_name)
if output_file:
if type(output_file) is list:
for f in output_file:
os.remove(f)
elif type(output_file) is str:
if os.path.exists(output_file):
if os.path.isdir(output_file):
shutil.rmtree(output_file)
elif os.path.isfile(output_file):
os.remove(output_file)
else:
raise Exception("Unexpected output file argument")
def template(self,
cmd_list,
expected_output_file: Optional[str] = "result.json",
success=True,
script=file_fib,
script_name="cmdline_test.py",
expected_entries=None,
expected_stdout=None,
expected_stderr=None,
cleanup=True,
check_func=None,
concurrency=None,
send_sig=None):
assert "python" not in cmd_list, \
"Do not use unqualified 'python' to launch intrepreter. Passing sys.executable is the recommended way."
if os.getenv("COVERAGE_RUN"):
if "viztracer" in cmd_list:
idx = cmd_list.index("viztracer")
if not concurrency:
cmd_list = ["coverage", "run", "--source", "viztracer", "--parallel-mode", "-m"] \
+ cmd_list[idx:]
elif concurrency == "multiprocessing":
# Specification needs to be in config file
cmd_list = ["coverage", "run", "--concurrency=multiprocessing", "-m"] \
+ cmd_list[idx:]
elif "vizviewer" in cmd_list:
idx = cmd_list.index("vizviewer")
cmd_list = ["coverage", "run", "--source", "viztracer", "--parallel-mode", "-m"] + ["viztracer.viewer"] \
+ cmd_list[idx + 1:]
elif sys.executable in cmd_list:
idx = cmd_list.index(sys.executable)
cmd_list = ["coverage", "run", "--source", "viztracer", "--parallel-mode"] \
+ cmd_list[idx + 1:]
if script:
self.build_script(script, script_name)
if send_sig is not None:
if isinstance(send_sig, tuple):
sig, wait = send_sig
else:
sig = send_sig
if os.getenv("GITHUB_ACTIONS"):
# github action is slower
wait = 5
else:
wait = 2
p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if isinstance(wait, str):
while True:
line = p.stdout.readline().decode("utf-8")
if wait in line:
time.sleep(0.5)
break
elif isinstance(wait, (int, float)):
time.sleep(wait)
p.send_signal(sig)
p.wait(timeout=60)
stdout, stderr = p.stdout.read(), p.stderr.read()
p.stdout.close()
p.stderr.close()
p.stdout, p.stderr = stdout, stderr
result = p
if sys.platform == "win32":
# If we are on win32, we can't get anything useful from
# terminating the process
return None
else:
if os.getenv("COVERAGE_RUN"):
timeout = 90
else:
timeout = 60
try:
result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
except subprocess.TimeoutExpired as e:
logging.error("Timeout!")
logging.error(f"stdout: {e.stdout}")
logging.error(f"stderr: {e.stderr}")
raise e
expected = (success ^ (result.returncode != 0))
if not expected:
logging.error(f"return code: {result.returncode}")
logging.error(f"stdout:\n{result.stdout.decode('utf-8')}")
logging.error(f"stderr:\n{result.stderr.decode('utf-8')}")
self.assertTrue(expected)
if expected:
if success and expected_output_file:
if type(expected_output_file) is list:
for f in expected_output_file:
self.assertFileExists(f)
elif type(expected_output_file) is str:
self.assertFileExists(expected_output_file)
if success and expected_entries:
assert (type(expected_output_file) is str and expected_output_file.split(".")[-1] == "json")
with open(expected_output_file) as f:
data = json.load(f)
self.assertEventNumber(data, expected_entries)
if expected_stdout is not None:
self.assertRegex(result.stdout.decode("utf-8"), expected_stdout)
if expected_stderr is not None:
self.assertRegex(result.stderr.decode("utf-8"), expected_stderr)
if check_func:
assert (type(expected_output_file) is str and expected_output_file.split(".")[-1] == "json")
with open(expected_output_file) as f:
data = json.load(f)
check_func(data)
if cleanup:
self.cleanup(output_file=expected_output_file, script_name=script_name)
return result
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C/C++
1
https://gitee.com/mirrors/viztracer.git
git@gitee.com:mirrors/viztracer.git
mirrors
viztracer
viztracer
master

搜索帮助