Ai
3 Star 6 Fork 0

Gitee 极速下载/viztracer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/gaogaotiantian/viztracer
克隆/下载
test_regression.py 15.35 KB
一键复制 编辑 原始数据 按行查看 历史
Tian Gao 提交于 2025-12-12 12:19 +08:00 . Fix timestamp skew issue on Mac/Windows (#648)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/gaogaotiantian/viztracer/blob/master/NOTICE.txt
import multiprocessing
import os
import signal
import sys
import tempfile
import textwrap
import unittest
import viztracer
from viztracer import VizTracer, ignore_function
from .base_tmpl import BaseTmpl
from .cmdline_tmpl import CmdlineTmpl
class TestIssue1(BaseTmpl):
def test_datetime(self):
tracer = viztracer.VizTracer(verbose=0)
tracer.start()
from datetime import timedelta
timedelta(hours=5)
tracer.stop()
tracer.parse()
tracer.save(output_file="tmp.json")
tracer = viztracer.VizTracer(verbose=0)
tracer.start()
from datetime import timedelta
timedelta(hours=5)
tracer.stop()
tracer.parse()
tracer.save(output_file="tmp.json")
os.remove("tmp.json")
class TestStackOptimization(BaseTmpl):
# There's an order issue in tracefunc to skip the FEE log
# If the stack is empty(stack_top is NULL), and we entered
# into an ignored function, ignore_stack_depth will increment.
# However, when its corresponding exit comes, ignore_stack_depth
# won't be decrement because the function is skipped when
# stack is empty and it's a return function
def test_instant(self):
def s():
return 0
tracer = VizTracer(verbose=0)
tracer.start()
# This is a library function which will be ignored, but
# this could trick the system into a ignoring status
tracer.add_instant('name = {"a": 1}')
s()
s()
s()
tracer.stop()
entries = tracer.parse()
tracer.save()
self.assertEqual(entries, 4)
class TestSegFaultRegression(BaseTmpl):
# Without parsing, cleanup of C function had caused segfault
def test_cleanup(self):
tracer = VizTracer()
tracer.start()
_ = len([1, 2, 3])
_ = sum([2, 3, 4])
try:
raise Exception("lol")
except Exception:
pass
tracer.stop()
tracer.clear()
class TestFunctionArg(BaseTmpl):
def test_functionarg(self):
def f(n):
tracer.add_func_args("input", n)
if n < 2:
return 1
return f(n - 1) + f(n - 2)
tracer = VizTracer(verbose=0)
tracer.start()
f(5)
tracer.stop()
tracer.parse()
inputs = set()
for d in tracer.data["traceEvents"]:
if d["ph"] == "X":
inputs.add(d["args"]["input"])
self.assertEqual(inputs, set([0, 1, 2, 3, 4, 5]))
issue21_code = """
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--script_option", action="store_true")
parser.add_argument("-o", action="store_true")
options = parser.parse_args()
print(options)
if not options.script_option:
exit(1)
"""
class TestIssue21(CmdlineTmpl):
# viztracer --run my_script --script_option
# is not parsed correctly because the program gets confused
# about --script_option
def test_issue21(self):
self.template(["viztracer", "--include_files", "/", "--run", "cmdline_test.py", "--script_option"],
script=issue21_code)
self.template(["viztracer", "--include_files", "/", "--", "cmdline_test.py", "--script_option"],
script=issue21_code)
self.template(["viztracer", "cmdline_test.py", "--script_option"], script=issue21_code)
self.template(["viztracer", "--run", "cmdline_test.py", "-o", "--script_option"], script=issue21_code)
self.template(["viztracer", "--", "cmdline_test.py", "-o", "--script_option"], script=issue21_code)
term_code = """
import time
a = []
a.append(1)
print("ready", flush=True)
for i in range(10):
time.sleep(1)
"""
class TestTermCaught(CmdlineTmpl):
@unittest.skipIf(sys.platform == "win32", "windows does not have graceful term")
def test_term(self):
self.template(["viztracer", "-o", "term.json", "cmdline_test.py"],
expected_output_file="term.json", script=term_code, send_sig=(signal.SIGTERM, "ready"))
class TestIssue42(BaseTmpl):
def test_issue42(self):
@ignore_function
def f():
lst = []
lst.append(1)
tracer = VizTracer(verbose=0)
tracer.start()
f()
tracer.stop()
tracer.parse()
self.assertEventNumber(tracer.data, 0)
issue47_code = """
import sys
import gc
class C:
def __init__(self):
self.data = bytearray()
def change(self):
b = memoryview(self.data).tobytes()
self.data += b"123123"
del self.data[:1]
c = C()
c.change()
"""
class TestIssue47(CmdlineTmpl):
def test_issue47(self):
self.template(["viztracer", "cmdline_test.py", "-o", "result.json"],
script=issue47_code,
expected_output_file="result.json",
expected_entries=7)
class TestIssue58(CmdlineTmpl):
def test_issue58(self):
if multiprocessing.get_start_method() == "fork":
self.template(["viztracer", "-m", "tests.modules.issue58"],
expected_output_file="result.json")
class TestIssue83(CmdlineTmpl):
def test_issue83(self):
self.template(["viztracer", "--quiet", "-m", "tests.modules.issue83"],
expected_stdout="__main__")
issue119_code = """
import os
import sys
import tempfile
os.chdir(sys.argv[1])
"""
class TestIssue119(CmdlineTmpl):
def test_issue119(self):
with tempfile.TemporaryDirectory() as name:
filepath = os.path.join(name, "result.json")
cwd = os.getcwd()
os.chdir(name)
with tempfile.TemporaryDirectory() as script_dir:
try:
self.template(
["viztracer", "-o", "result.json", "cmdline_test.py", script_dir],
script=issue119_code,
expected_output_file=filepath)
finally:
os.chdir(cwd)
issue121_code = """
import atexit
def fib(n):
if n <= 2:
return 1
return fib(n - 1) + fib(n - 2)
atexit.register(fib, 6)
"""
class TestIssue121(CmdlineTmpl):
def test_issue121(self):
def check_func(data):
fib_count = sum(["fib" in event["name"] for event in data["traceEvents"]])
self.assertEqual(fib_count, 15)
self.template(["viztracer", "cmdline_test.py", "--log_exit"],
script=issue121_code,
check_func=check_func)
issue141_code = """
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import time
def my_function(*args):
time.sleep(0.5)
if __name__ == '__main__':
e = ProcessPoolExecutor(max_workers=3)
e.map(my_function, range(1))
"""
class TestIssue141(CmdlineTmpl):
def test_issue141(self):
self.template(
["viztracer", "cmdline_test.py"],
script=issue141_code,
)
class TestIssue160(CmdlineTmpl):
def test_issue160(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 2)
self.template(["viztracer", "-m", "tests.modules.issue160"],
expected_output_file="result.json", check_func=check_func)
issue162_code = """
from concurrent.futures import ProcessPoolExecutor
def work(d):
return d * 2
if __name__ == "__main__":
output = 0
data = range(10)
with ProcessPoolExecutor(2) as executor:
for _, data_collected in zip(data, executor.map(work, data)):
output += data_collected
print(output)
"""
issue162_code_os_popen = """
import os
print(os.popen("echo test_issue162").read())
"""
class TestIssue162(CmdlineTmpl):
def test_issue162(self):
self.template(["viztracer", "cmdline_test.py"], expected_output_file="result.json",
script=issue162_code, expected_stdout=r"90\s*Saving.*")
@unittest.skipIf(sys.platform == "win32", "Windows does not have echo")
def test_issue162_os_popen(self):
self.template(["viztracer", "cmdline_test.py"], expected_output_file="result.json",
script=issue162_code_os_popen, expected_stdout=r".*test_issue162.*")
class TestIssue508(CmdlineTmpl):
def test_issue508(self):
script = """
import inspect
import os
import viztracer
exclude = os.path.dirname(inspect.__file__)
def call_self(n):
if n == 0:
return
call_self(n - 1)
with viztracer.VizTracer(exclude_files=[exclude], max_stack_depth=6):
inspect.getsource(call_self)
call_self(10)
"""
self.template([sys.executable, "cmdline_test.py"], script=script,
expected_output_file="result.json",
expected_entries=6)
@unittest.skipIf(sys.version_info < (3, 12), "We only care about monitoring backend")
class TestIssue552(CmdlineTmpl):
def test_issue552(self):
script = textwrap.dedent("""
from viztracer import VizTracer
class A:
f = classmethod(repr)
with VizTracer():
A().f()
""")
self.template([sys.executable, "cmdline_test.py"], script=script,
expected_output_file="result.json",
expected_entries=1)
file_timestamp_disorder = """
def g():
pass
g()
g()
g()
g()
g()
g()
g()
g()
g()
g()
g()
"""
class TestTimestampDisorder(CmdlineTmpl):
def test_timestamp_overlap(self):
def check_func(data):
counter = 0
curr_time = 0
for event in data["traceEvents"]:
if event["ph"] == "X" and event["name"].startswith("g"):
counter += 1
self.assertGreaterEqual(event["ts"], curr_time)
self.assertGreaterEqual(event["dur"], 0)
curr_time = event["ts"] + event["dur"]
self.template(["viztracer", "cmdline_test.py"], script=file_timestamp_disorder,
expected_output_file="result.json", check_func=check_func)
class TestTimestampSkew(CmdlineTmpl):
"""
Ensure that we are not accumulating timestamp too much with artificial
increments.
"""
def test_timestamp_skew(self):
script = textwrap.dedent("""
import time
for _ in range(1000000):
len([])
time.sleep(0.002)
""")
def check_func(data):
for event in reversed(data["traceEvents"]):
if event["ph"] == "X" and "time.sleep" in event["name"]:
self.assertGreater(event["dur"], 1500)
self.template(["viztracer", "--tracer_entries", "100", "cmdline_test.py"],
script=script, check_func=check_func)
issue285_code = """
import threading
from viztracer import get_tracer
from viztracer.vizcounter import VizCounter
from viztracer.vizobject import VizObject
def fib(n):
if n < 2:
return 1
return fib(n - 1) + fib(n - 2)
class MyThread(threading.Thread):
def run(self):
fib(7)
tracer = get_tracer()
# test object event name escape with and without args
obj = VizObject(tracer, "test \\\\ \\\" \\b \\f \\n \\r \\t")
obj.test = "test \\\\ \\\" \\b \\f \\n \\r \\t"
# test counter event name escape with and without args
counter = VizCounter(tracer, "test \\\\ \\\" \\b \\f \\n \\r \\t")
counter.test = 10
# test instant event name escape with and without args
tracer.log_instant("test \\\\ \\\" \\b \\f \\n \\r \\t")
tracer.log_instant("test \\\\ \\\" \\b \\f \\n \\r \\t", "test \\\\ \\\" \\b \\f \\n \\r \\t")
# test thread name escape
test_thread = MyThread(name = "test \\\\ \\\" \\b \\f \\n \\r \\t")
test_thread.start()
test_thread.join()
"""
class TestEscapeString(CmdlineTmpl):
def test_escape_string(self):
self.template(["viztracer", "-o", "result.json", "--dump_raw", "cmdline_test.py"],
expected_output_file="result.json",
script=issue285_code,
expected_stdout=".*Total Entries:.*")
wait_for_child = """
import os
import time
import multiprocessing
def target(conn):
conn.recv()
conn.send("ready")
conn.recv()
if os.getenv("GITHUB_ACTIONS"):
time.sleep(3)
else:
time.sleep(1)
if __name__ == '__main__':
parent, child = multiprocessing.Pipe()
p = multiprocessing.Process(target=target, args=(child,))
p.start()
# The main process will join the child in multiprocessing.process._children.
# This is a hack to make sure the main process won't join the child process,
# so we can test the VizUI.wait_children_finish function
multiprocessing.process._children = set()
parent.send("check")
parent.recv()
parent.send("exit")
"""
wait_for_terminated_child = """
import time
import os
import signal
import multiprocessing
def target(conn):
conn.recv()
conn.send("ready")
conn.recv()
if os.getenv("GITHUB_ACTIONS"):
time.sleep(3)
else:
time.sleep(1)
os.kill(os.getpid(), signal.SIGTERM)
if __name__ == '__main__':
parent, child = multiprocessing.Pipe()
p = multiprocessing.Process(target=target, args=(child,))
p.start()
# The main process will join the child in multiprocessing.process._children.
# This is a hack to make sure the main process won't join the child process,
# so we can test the VizUI.wait_children_finish function
multiprocessing.process._children = set()
parent.send("check")
parent.recv()
parent.send("exit")
"""
class TestWaitForChild(CmdlineTmpl):
def test_child_process_exits_normally(self):
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", expected_stdout=r"Wait",
script=wait_for_child)
def test_child_process_exits_abnormally(self):
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", expected_stdout=r"Wait",
script=wait_for_terminated_child)
class TestFinalizerReference(CmdlineTmpl):
def test_finalizer(self):
script = textwrap.dedent("""
import atexit
import sys
def task():
sys.getsizeof([1, 2, 3])
print("success")
if __name__ == "__main__":
atexit.register(task)
""")
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=script,
expected_stdout="success")
class TestThreadingExitOrder(CmdlineTmpl):
def test_threading_exit_order(self):
threading_exit_order = """
import threading
if __name__ == "__main__":
threading._register_atexit(print, " world")
threading._register_atexit(print, "hello", end="")
"""
self.template(
["viztracer", "cmdline_test.py"],
script=threading_exit_order,
expected_stdout="hello world"
)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C/C++
1
https://gitee.com/mirrors/viztracer.git
git@gitee.com:mirrors/viztracer.git
mirrors
viztracer
viztracer
master

搜索帮助