Ai
3 Star 6 Fork 0

Gitee 极速下载/viztracer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/gaogaotiantian/viztracer
克隆/下载
test_multiprocess.py 22.10 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/gaogaotiantian/viztracer/blob/master/NOTICE.txt
import multiprocessing
import os
import signal
import sys
import tempfile
import textwrap
import unittest
import json
from .cmdline_tmpl import CmdlineTmpl
file_grandparent = """
import subprocess
import sys
subprocess.run([sys.executable, "parent.py"])
"""
file_parent = """
import subprocess
import sys
subprocess.run([sys.executable, "child.py"])
subprocess.run((sys.executable, "child.py"))
subprocess.run(f"{sys.executable} child.py")
"""
file_child = """
def fib(n):
if n < 2:
return 1
return fib(n-1) + fib(n-2)
fib(5)
"""
file_subprocess_term = """
import time
print("ready", flush=True)
while True:
time.sleep(0.5)
"""
file_subprocess_module = """
import subprocess
import sys
print(subprocess.call([sys.executable, "-m", "timeit", "-n", "100", "'1+1'"]))
"""
file_subprocess_code_string = """
import subprocess
import sys
p = subprocess.Popen([sys.executable, '-c', 'import time;time.sleep(0.5)'])
p.wait()
"""
file_subprocess_shell = """
import os
import subprocess
import sys
with open(os.path.join(os.path.dirname(__file__), "sub.py"), "w") as f:
f.write("print('hello')")
path = os.path.join(os.path.dirname(__file__), "sub.py")
print(subprocess.call(f"{sys.executable} {path}", shell=True))
"""
file_fork = """
import os
import time
pid = os.fork()
if pid > 0:
time.sleep(0.1)
print("parent")
else:
print("child")
"""
file_fork_wait = """
import os
import time
pid = os.fork()
if pid > 0:
time.sleep(0.1)
print("parent")
else:
time.sleep(4.5)
print("child")
"""
file_multiprocessing = """
import multiprocessing
from multiprocessing import Process
import time
def fib(n):
if n < 2:
return 1
return fib(n-1) + fib(n-2)
def f():
fib(5)
if __name__ == "__main__":
fib(2)
p = Process(target=f)
p.start()
p.join()
time.sleep(0.1)
"""
file_nested_multiprocessing = """
import multiprocessing
from multiprocessing import Process
import time
def fib(n):
if n < 2:
return 1
return fib(n-1) + fib(n-2)
def f():
fib(5)
def spawn():
p = Process(target=f)
p.start()
p.join()
if __name__ == "__main__":
fib(2)
p = Process(target=spawn)
p.start()
p.join()
time.sleep(0.1)
"""
file_multiprocessing_overload_run = """
import multiprocessing
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
self.fib(5)
def fib(self, n):
if n < 2:
return 1
return self.fib(n-1) + self.fib(n-2)
if __name__ == "__main__":
p = MyProcess()
p.start()
p.join()
time.sleep(0.1)
"""
file_multiprocessing_stack_limit = """
import multiprocessing
from multiprocessing import Process
import time
from viztracer import get_tracer
def fib(n):
if n < 2:
return 1
return fib(n-1) + fib(n-2)
def f():
fib(5)
def cb(tracer):
print(tracer)
tracer.max_stack_depth = 2
if __name__ == "__main__":
get_tracer().set_afterfork(cb)
p = Process(target=f)
p.start()
p.join()
time.sleep(0.1)
"""
file_pool = """
import gc
from multiprocessing import Process, Pool
import os
import time
def f(x):
return x*x
if __name__ == "__main__":
process_num = 2
# gc seems to cause SegFault with multithreading
# Pool creates a couple of thread and it's failing the test
# https://github.com/python/cpython/issues/101975
gc.disable()
with Pool(processes=process_num) as pool:
print(pool.map(f, range(10)))
for i in pool.imap_unordered(f, range(10)):
print(i)
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(process_num)]
print([res.get(timeout=1) for res in multiple_results])
gc.enable()
"""
file_pool_with_pickle = """
from multiprocessing import get_context
class Bar:
pass
def foo(args):
return Bar()
if __name__ == '__main__':
with get_context('spawn').Pool(1) as pool:
_ = list(pool.imap_unordered(foo, [1]))
"""
file_loky = """
from loky import get_reusable_executor
import time
import random
def my_function(*args):
duration = random.uniform(0.1, 0.3)
time.sleep(duration)
e = get_reusable_executor(max_workers=4)
e.map(my_function, range(5))
"""
class TestSubprocess(CmdlineTmpl):
def setUp(self):
super().setUp()
with open("child.py", "w") as f:
f.write(file_child)
def tearDown(self):
super().tearDown()
os.remove("child.py")
def assertSubprocessName(self, name, data):
for entry in data["traceEvents"]:
if entry["name"] == "process_name" and entry["args"]["name"] == name:
break
else:
self.fail("no matching subprocess name")
def test_basic(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 4)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", script=file_parent, check_func=check_func)
def test_child_process(self):
with tempfile.TemporaryDirectory() as tmpdir:
self.template(["viztracer", "-o", os.path.join(tmpdir, "result.json"), "--subprocess_child", "child.py"],
expected_output_file=None)
self.assertEqual(len(os.listdir(tmpdir)), 1)
with open(os.path.join(tmpdir, os.listdir(tmpdir)[0])) as f:
self.assertSubprocessName("child.py", json.load(f))
def test_module(self):
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "result.json")
self.template(["viztracer", "-o", output_file, "cmdline_test.py"],
expected_output_file=output_file,
expected_stdout=".*100 loops.*",
script=file_subprocess_module,
check_func=lambda data: self.assertSubprocessName("timeit", data))
def test_code_string(self):
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "result.json")
self.template(["viztracer", "-o", output_path, "cmdline_test.py"],
expected_output_file=output_path,
script=file_subprocess_code_string,
check_func=lambda data: self.assertSubprocessName("python -c", data))
# this is for coverage
self.template(['viztracer', '-o', os.path.join(tmpdir, "result.json"), '--subprocess_child',
'-c', 'import time;time.sleep(0.5)'], expected_output_file=None)
self.assertEqual(len(os.listdir(tmpdir)), 1)
with open(os.path.join(tmpdir, os.listdir(tmpdir)[0])) as f:
self.assertSubprocessName("python -c", json.load(f))
def test_python_entries(self):
script = textwrap.dedent("""
import subprocess
subprocess.check_output(["vizviewer", "-h"])
subprocess.check_output(["ls", "./"])
try:
subprocess.check_output(["nonexist"])
except Exception:
pass
""")
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
if sys.platform == "win32":
# Windows uses exe for python entries and we can't hook that
self.assertEqual(len(pids), 1)
else:
self.assertEqual(len(pids), 2)
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "result.json")
self.template(["viztracer", "-o", output_path, "cmdline_test.py"],
expected_output_file=output_path,
script=script,
check_func=check_func)
def test_subprocess_shell_true(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 2)
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "result.json")
self.template(["viztracer", "-o", output_path, "cmdline_test.py"],
expected_output_file=output_path,
script=file_subprocess_shell,
expected_stdout=".*hello.*",
check_func=check_func)
def test_nested(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 5)
with open("parent.py", "w") as f:
f.write(file_parent)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", script=file_grandparent, check_func=check_func)
os.remove("parent.py")
def test_nested_multiprocessing(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 3)
with open("parent.py", "w") as f:
f.write(file_multiprocessing)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", script=file_grandparent, check_func=check_func)
os.remove("parent.py")
@unittest.skipIf(sys.platform == "win32", "Can't get anything on Windows with SIGTERM")
def test_term(self):
with tempfile.TemporaryDirectory() as tmpdir:
self.template(["viztracer", "-o", os.path.join(tmpdir, "result.json"), "--subprocess_child", "cmdline_test.py"],
script=file_subprocess_term, expected_output_file=None, send_sig=(signal.SIGTERM, "ready"))
self.assertEqual(len(os.listdir(tmpdir)), 1)
class TestMultiprocessing(CmdlineTmpl):
def test_os_fork(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertGreater(len(pids), 1)
if sys.platform in ["linux", "linux2"]:
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", script=file_fork, check_func=check_func)
@unittest.skipIf(sys.platform not in ["linux", "linux2"], "Only works on Linux")
def test_os_fork_term(self):
def check_func_wrapper(process_num):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), process_num)
return check_func
result = self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", script=file_fork_wait,
check_func=check_func_wrapper(2))
self.assertIn("Wait for child process", result.stdout.decode())
result = self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
send_sig=(signal.SIGINT, 3.5), expected_output_file="result.json", script=file_fork_wait,
check_func=check_func_wrapper(1))
@unittest.skipIf(sys.platform not in ["linux", "linux2"], "Only works on Linux")
def test_os_fork_exit(self):
script = textwrap.dedent("""
import os
import sys
pid = os.fork()
if pid == 0:
sys.exit(0)
else:
os.waitpid(pid, 0)
""")
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 2)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json", script=script,
check_func=check_func)
def test_multiprosessing(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertGreater(len(pids), 1)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=file_multiprocessing,
check_func=check_func,
concurrency="multiprocessing")
@unittest.skipIf("forkserver" not in multiprocessing.get_all_start_methods(), "Only works on supported platform")
def test_multiprocessing_forkserver(self):
script = """
import multiprocessing
from multiprocessing import get_context
def foo():
pass
if __name__ == "__main__":
p = get_context('forkserver').Process(target=foo)
p.start()
p.join()
"""
script_pool = """
from multiprocessing import get_context
def foo(arg):
pass
if __name__ == '__main__':
with get_context('forkserver').Pool(1) as pool:
_ = list(pool.imap_unordered(foo, [1]))
"""
def check_func(data):
pids = set()
has_foo = False
for entry in data["traceEvents"]:
pids.add(entry["pid"])
if "foo" in entry["name"]:
has_foo = True
self.assertGreater(len(pids), 1)
self.assertTrue(has_foo)
self.template(
["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=script,
check_func=check_func,
)
self.template(
["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=script_pool,
check_func=check_func,
)
def test_nested_multiprosessing(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 3)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=file_nested_multiprocessing,
check_func=check_func,
concurrency="multiprocessing")
def test_multiprocessing_unique_name(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 2)
with tempfile.TemporaryDirectory() as tmpdir:
self.template(["viztracer", "--output_dir", tmpdir, "--unique_output_file", "cmdline_test.py"],
expected_output_file=None,
script=file_multiprocessing,
concurrency="multiprocessing")
self.assertEqual(len(os.listdir(tmpdir)), 1)
with open(os.path.join(tmpdir, os.listdir(tmpdir)[0])) as f:
data = json.load(f)
check_func(data)
def test_multiprocessing_entry_limit(self):
result = self.template(["viztracer", "-o", "result.json", "--tracer_entries", "10", "cmdline_test.py"],
expected_output_file="result.json",
script=file_multiprocessing,
expected_entries=20,
concurrency="multiprocessing")
self.assertIn("buffer is full", result.stdout.decode())
def test_ignore_multiprocessing(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertEqual(len(pids), 1)
self.template(["viztracer", "-o", "result.json", "--ignore_multiprocess", "cmdline_test.py"],
expected_output_file="result.json",
script=file_multiprocessing,
check_func=check_func,
concurrency="multiprocessing")
def test_multiprocessing_overload(self):
def check_func(data):
fib_count = 0
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
fib_count += 1 if "fib" in entry["name"] else 0
self.assertGreater(len(pids), 1)
self.assertEqual(fib_count, 15)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=file_multiprocessing_overload_run,
check_func=check_func,
concurrency="multiprocessing")
@unittest.skipIf("win32" in sys.platform, "Does not support Windows")
def test_multiprocessing_pool(self):
# I could not reproduce the stuck failure locally. This is only for
# coverage anyway, just skip it on 3.8+
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertGreater(len(pids), 1)
try:
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=file_pool,
check_func=check_func,
concurrency="multiprocessing")
except Exception as e:
# coveragepy has some issue with multiprocess pool
if not os.getenv("COVERAGE_RUN"):
raise e
@unittest.skipIf("win32" in sys.platform, "Does not support Windows")
def test_multiprocessing_pool_with_pickle(self):
def check_func(data):
pids = set()
for entry in data["traceEvents"]:
pids.add(entry["pid"])
self.assertGreater(len(pids), 1)
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=file_pool_with_pickle,
check_func=check_func,
concurrency="multiprocessing")
def test_multiprosessing_stack_depth(self):
def check_func(data):
for entry in data["traceEvents"]:
self.assertNotIn("fib", entry["name"].split())
if multiprocessing.get_start_method() == "fork":
self.template(["viztracer", "-o", "result.json", "cmdline_test.py"],
expected_output_file="result.json",
script=file_multiprocessing_stack_limit,
check_func=check_func,
concurrency="multiprocessing")
@unittest.skipIf(multiprocessing.get_start_method() != "fork", "Only need to test fork")
def test_multiprocessing_daemon(self):
with tempfile.TemporaryDirectory() as tmpdir:
script = f"""
import multiprocessing
from viztracer import trace_and_save
@trace_and_save(output_dir={repr(tmpdir)})
def foo():
pass
p = multiprocessing.Process(target=foo, daemon=True)
p.start()
p.join()
"""
self.template([sys.executable, "cmdline_test.py"],
expected_output_file=None,
script=script,
concurrency="multiprocessing")
self.assertEqual(len(os.listdir(tmpdir)), 1)
with open(os.path.join(tmpdir, os.listdir(tmpdir)[0])) as f:
data = json.load(f)
events = [event for event in data["traceEvents"] if event["ph"] == "X"]
self.assertEqual(len(events), 1)
self.assertIn("foo", events[0]["name"])
@unittest.skipIf("fork" not in multiprocessing.get_all_start_methods(), "Only need to test fork")
def test_multiprocessing_stack_depth(self):
script = """
import multiprocessing
def factorial(n):
if n == 0:
return 1
else:
return n * factorial(n - 1)
if __name__ == "__main__":
p = multiprocessing.get_context("fork").Process(target=factorial, args=(15,))
p.start()
p.join()
"""
def check_func(data):
count = 0
for entry in data["traceEvents"]:
if "factorial" in entry["name"]:
count += 1
self.assertEqual(count, 9)
self.template(["viztracer", "--max_stack_depth", "10", "cmdline_test.py"],
expected_output_file="result.json",
script=script,
check_func=check_func,
concurrency="multiprocessing")
@unittest.skipIf("free-threading" in sys.version, "loky does not support free-threading now")
class TestLoky(CmdlineTmpl):
def test_loky_basic(self):
def check_func(data):
pids = set()
for event in data["traceEvents"]:
pids.add(event["pid"])
# main, 4 workers, and a forked main on Linux
self.assertGreaterEqual(len(pids), 5)
self.template(["viztracer", "cmdline_test.py"], script=file_loky,
check_func=check_func, concurrency="multiprocessing")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C/C++
1
https://gitee.com/mirrors/viztracer.git
git@gitee.com:mirrors/viztracer.git
mirrors
viztracer
viztracer
master

搜索帮助