From 0e2448d82eb70e22be2c5adb630ef79444ef1004 Mon Sep 17 00:00:00 2001 From: l30056312 Date: Wed, 19 Feb 2025 10:55:25 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=BA:l30056312=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8E=9F=E5=9B=A0:=E8=BF=81=E7=A7=BBCI?= =?UTF-8?q?=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/access_control_test.py | 165 +++++++++++++++ tests/__init__.py | 0 tests/test_tools/__init__.py | 0 tests/test_tools/acquire_json.py | 78 +++++++ tests/test_tools/conftest.py | 28 +++ tests/test_tools/dist_test.py | 349 +++++++++++++++++++++++++++++++ tests/test_tools/test_ci_st.py | 122 +++++++++++ tests/test_tools/utils.py | 190 +++++++++++++++++ 8 files changed, 932 insertions(+) create mode 100644 ci/access_control_test.py create mode 100644 tests/__init__.py create mode 100644 tests/test_tools/__init__.py create mode 100644 tests/test_tools/acquire_json.py create mode 100644 tests/test_tools/conftest.py create mode 100644 tests/test_tools/dist_test.py create mode 100644 tests/test_tools/test_ci_st.py create mode 100644 tests/test_tools/utils.py diff --git a/ci/access_control_test.py b/ci/access_control_test.py new file mode 100644 index 00000000..e41ff0d7 --- /dev/null +++ b/ci/access_control_test.py @@ -0,0 +1,165 @@ +import os +from pathlib import Path + + +def read_files_from_txt(txt_file): + with open(txt_file, "r") as f: + return [line.strip() for line in f.readlines()] + + +def is_examples(file): + return file.startswith("examples/") or file.startswith("tests/poc/") + + +def is_poc(file): + return file.startswith("tests/poc") + + +def is_pipecase(file): + return file.startswith("tests/pipeline") + + +def is_markdown(file): + return file.endswith(".md") + + +def is_image(file): + return file.endswith(".jpg") or file.endswith(".png") + + +def is_txt(file): + return file.endswith(".txt") + + +def is_owners(file): + return file.startswith("OWNERS") + + +def is_license(file): + return file.startswith("LICENSE") + + +def is_ut(file): + return file.startswith("tests/ut") + + +def is_no_suffix(file): + return os.path.splitext(file)[1] == '' + + +def skip_ci(files, skip_conds): + for file in files: + if not any(condition(file) for condition in skip_conds): + return False + return True + + +def choose_skip_ci(raw_txt_file): + if not os.path.exists(raw_txt_file): + return False + + file_list = read_files_from_txt(raw_txt_file) + skip_conds = [ + is_examples, + is_pipecase, + is_markdown, + is_image, + is_txt, + is_owners, + is_license, + is_no_suffix, + is_poc + ] + + return skip_ci(file_list, skip_conds) + + +def filter_exec_ut(raw_txt_file): + file_list = read_files_from_txt(raw_txt_file) + filter_conds = [ + is_ut, + is_markdown + ] + for file in file_list: + if not any(condition(file) for condition in filter_conds): + return False, None + return True, file_list + + +def acquire_exitcode(command): + exitcode = os.system(command) + real_code = os.WEXITSTATUS(exitcode) + return real_code + + +# ============================= +# UT test, run with pytest +# ============================= + +class UTTest: + def __init__(self): + self.base_dir = Path(__file__).absolute().parents[1] + self.test_dir = os.path.join(self.base_dir, 'tests') + self.ut_files = os.path.join( + self.base_dir, self.test_dir, "ut" + ) + + def run_ut(self, raw_txt_file=None): + if raw_txt_file is not None and os.path.exists(raw_txt_file): + filtered_results = filter_exec_ut(raw_txt_file) + + if filtered_results[0]: + filtered_files = filtered_results[1] + full_path = [os.path.join(self.base_dir, file) for file in filtered_files] + exsit_ut_files = [file for file in full_path if os.path.exists(file) and file.endswith(".py")] + self.ut_files = " ".join(exsit_ut_files) + + command = f"pytest -x --log-cli-level=INFO {self.ut_files}" + code = acquire_exitcode(command) + if code == 0: + print("UT test success") + else: + print("UT failed") + exit(1) + + +# =============================================== +# ST test, run with sh. +# =============================================== + +class STTest: + def __init__(self): + self.base_dir = Path(__file__).absolute().parents[1] + self.test_dir = os.path.join(self.base_dir, 'tests') + + self.st_dir = "st" + self.st_shell = os.path.join( + self.test_dir, self.st_dir, "st_run.sh" + ) + + def run_st(self): + rectify_case = f"bash {self.st_shell}" + rectify_code = acquire_exitcode(rectify_case) + if rectify_code != 0: + print("rectify case failed, check it.") + exit(1) + + +def run_tests(raw_txt_file): + pass + + +def main(): + parent_dir = Path(__file__).absolute().parents[2] + raw_txt_file = os.path.join(parent_dir, "modify.txt") + + skip_signal = choose_skip_ci(raw_txt_file) + if skip_signal: + print("Skipping CI") + else: + run_tests(raw_txt_file) + + +if __name__ == "__main__": + main() + \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_tools/__init__.py b/tests/test_tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_tools/acquire_json.py b/tests/test_tools/acquire_json.py new file mode 100644 index 00000000..1b7f7f7c --- /dev/null +++ b/tests/test_tools/acquire_json.py @@ -0,0 +1,78 @@ +import re +import json +import enum +import os + + +class TypeOfTest(enum.Enum): + APPROX = 1 + DETERMINISTIC = 2 + + +def transfer_logs_as_json(log_file, output_json_file): + """ + Read a log file from the input path, and return the + summary specified as input as a list + + Args: + log_file: str, path to the dir where the logs are located. + output_json_file: str, path of the json file transferred from the logs. + + Returns: + data: json, the values parsed from the log, formatted as a json file. + """ + + log_pattern = re.compile( + r"throughput per GPU \(TFLOP/s/GPU\):\s+([0-9.]+)\s+\|.*?lm loss:\s+([0-9.]+E[+-][0-9]+) | .* actor/pg_loss : ([0-9.]+)" + ) + + if "trl_ppo" in log_file: + log_pattern = re.compile( + r'throughput per GPU \(TFLOP/s/GPU\):\s([0-9\.]+).*?(?:abs_pg_loss:|vf_loss:)\s([0-9\.]+)' + ) + + memory_pattern = re.compile( + r"\[Rank (\d+)\] \(after \d+ iterations\) memory \(MB\) \| allocated: ([0-9.]+) \| max allocated: ([0-9.]+)" + ) + + data = { + "lm loss": [], + "throughput": [], + "memo info": [] + } + with open(log_file, "r") as f: + log_content = f.read() + log_matches = log_pattern.findall(log_content) + memory_matches = memory_pattern.findall(log_content) + + if log_matches: + if log_matches[0][1] != "": + data["lm loss"] = [float(match[1]) for match in log_matches] + data["throughput"] = [float(match[0]) for match in log_matches] + else: + data["lm loss"] = [float(match[2]) for match in log_matches] + + if memory_matches: + memo_info = [ + { + "rank": int(match[0]), + "allocated memory": float(match[1]), + "max allocated memory": float(match[2]) + } + for match in memory_matches + ] + data["memo info"] = sorted(memo_info, key=lambda x: x["rank"]) + + with open(output_json_file, "w") as outfile: + json.dump(data, outfile, indent=4) + + +def read_json(file): + """ + Read baseline and new generate json file + """ + if os.path.exists(file): + with open(file) as f: + return json.load(f) + else: + raise FileExistsError("The file does not exist !") diff --git a/tests/test_tools/conftest.py b/tests/test_tools/conftest.py new file mode 100644 index 00000000..870b1426 --- /dev/null +++ b/tests/test_tools/conftest.py @@ -0,0 +1,28 @@ +import pytest + + +# we still want to configure path argument by ourselves +# for different prefix_name of different scripts so we use this method. +# One more thing, as you can see, it has more scalibility. +def pytest_addoption(parser: pytest.Parser): + parser.addoption("--baseline-json", action="store", default=None, + help="Path to the baseline JSON file") + parser.addoption("--generate-log", action="store", default=None, + help="Path to the generate log file") + parser.addoption("--generate-json", action="store", default=None, + help="Path to the generate JSON file") + + +@pytest.fixture(autouse=True) +def baseline_json(request: pytest.FixtureRequest): + return request.config.getoption("--baseline-json") + + +@pytest.fixture(autouse=True) +def generate_log(request: pytest.FixtureRequest): + return request.config.getoption("--generate-log") + + +@pytest.fixture(autouse=True) +def generate_json(request: pytest.FixtureRequest): + return request.config.getoption("--generate-json") diff --git a/tests/test_tools/dist_test.py b/tests/test_tools/dist_test.py new file mode 100644 index 00000000..1cc65970 --- /dev/null +++ b/tests/test_tools/dist_test.py @@ -0,0 +1,349 @@ +# Copyright (c) Microsoft Corporation. +# +# This source code is licensed under the Apache license found in the +# LICENSE file in the root directory of this source tree. + +# copied from https://github.com/microsoft/DeepSpeed/blob/master/tests/unit/common.py +# reworked/refactored some parts to make it run. +import os +import time +import inspect +import socket +import json +from abc import ABC, abstractmethod + +import torch +import torch.multiprocessing as mp +import torch.distributed as dist + +import pytest +from _pytest.outcomes import Skipped +from _pytest.fixtures import FixtureLookupError, FixtureFunctionMarker + +# Worker timeout for tests that hang +TEST_TIMEOUT = 600 + + +def get_xdist_worker_id(): + xdist_worker = os.environ.get("PYTEST_XDIST_WORKER", None) + if xdist_worker is not None: + xdist_worker_id = xdist_worker.replace("gw", "") + return int(xdist_worker_id) + return None + + +def get_master_port(base_port=29500, port_range_size=1000): + xdist_worker_id = get_xdist_worker_id() + if xdist_worker_id is not None: + # Make xdist workers use different port ranges to avoid race conditions + base_port += port_range_size * xdist_worker_id + + # Select first open port in range + port = base_port + max_port = base_port + port_range_size + sock = socket.socket() + while port < max_port: + try: + sock.bind(("", port)) + sock.close() + return str(port) + except OSError: + port += 1 + raise IOError("no free ports") + + +class DistributedExec(ABC): + """ + Base class for distributed execution of functions/methods. Contains common + methods needed for DistributedTest and DistributedFixture. + """ + + world_size = 2 + backend = "nccl" + init_distributed = True + set_dist_env = True + reuse_dist_env = False + _pool_cache = {} + exec_timeout = TEST_TIMEOUT + + @abstractmethod + def run(self): + ... + + def __call__(self, request=None): + self._fixture_kwargs = self._get_fixture_kwargs(request, self.run) + world_size = self.world_size + if not torch.cuda.is_available(): + pytest.skip("only supported in accelerator environments.") + + if isinstance(world_size, int): + world_size = [world_size] + for procs in world_size: + self._launch_procs(procs) + + def _get_fixture_kwargs(self, request, func): + if not request: + return {} + # Grab fixture / parametrize kwargs from pytest request object + fixture_kwargs = {} + params = inspect.getfullargspec(func).args + params.remove("self") + for p in params: + try: + fixture_kwargs[p] = request.getfixturevalue(p) + except FixtureLookupError: + pass # test methods can have kwargs that are not fixtures + return fixture_kwargs + + def _launch_procs(self, num_procs): + # Verify we have enough accelerator devices to run this test + if torch.cuda.is_available() and torch.cuda.device_count() < num_procs: + pytest.skip( + f"Skipping test because not enough GPUs are available: {num_procs} required, {torch.cuda.device_count()} available" + ) + + # Set start method to `forkserver` (or `fork`) + mp.set_start_method("forkserver", force=True) + + # Create process pool or use cached one + master_port = None + if self.reuse_dist_env: + if num_procs not in self._pool_cache: + self._pool_cache[num_procs] = mp.Pool(processes=num_procs) + master_port = get_master_port() + pool = self._pool_cache[num_procs] + else: + pool = mp.Pool(processes=num_procs) + master_port = get_master_port() + + # Run the test + args = [(local_rank, num_procs, master_port) for local_rank in range(num_procs)] + skip_msgs_async = pool.starmap_async(self._dist_run, args) + + try: + skip_msgs = skip_msgs_async.get(self.exec_timeout) + except mp.TimeoutError: + # Shortcut to exit pytest in the case of a hanged test. This + # usually means an environment error and the rest of tests will + # hang (causing super long unit test runtimes) + pytest.exit("Test hanged, exiting", returncode=0) + + # Tear down distributed environment and close process pools + self._close_pool(pool, num_procs) + + # If we skipped a test, propagate that to this process + if any(skip_msgs): + assert len(set(skip_msgs)) == 1, "Multiple different skip messages received" + pytest.skip(skip_msgs[0]) + + def _dist_run(self, local_rank, num_procs, master_port): + skip_msg = "" + if not dist.is_initialized(): + """ Initialize torch.distributed and execute the user function. """ + if self.set_dist_env: + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = str(master_port) + os.environ["LOCAL_RANK"] = str(local_rank) + # NOTE: unit tests don't support multi-node so local_rank == global rank + os.environ["RANK"] = str(local_rank) + # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE + # single node launcher would also set LOCAL_SIZE accordingly + os.environ["LOCAL_SIZE"] = str(num_procs) + os.environ["WORLD_SIZE"] = str(num_procs) + + print( + f"Initializing torch.distributed with rank: {local_rank}, world_size: {num_procs}" + ) + torch.cuda.set_device(local_rank % torch.cuda.device_count()) + init_method = "tcp://" + master_ip = os.getenv("MASTER_ADDR", "localhost") + master_port = str(master_port) + init_method += master_ip + ":" + master_port + torch.distributed.init_process_group( + backend=self.backend, + world_size=num_procs, + rank=local_rank, + init_method=init_method, + ) + + if torch.cuda.is_available(): + torch.cuda.set_device(local_rank) + + try: + self.run(**self._fixture_kwargs) + except BaseException as e: + if isinstance(e, Skipped): + skip_msg = e.msg + else: + raise e + + return skip_msg + + def _dist_destroy(self): + if (dist is not None) and dist.is_initialized(): + dist.barrier() + dist.destroy_process_group() + + def _close_pool(self, pool, num_procs, force=False): + if force or not self.reuse_dist_env: + msg = pool.starmap(self._dist_destroy, [() for _ in range(num_procs)]) + pool.close() + pool.join() + + +class DistributedFixture(DistributedExec): + """ + Implementation that extends @pytest.fixture to allow for distributed execution. + This is primarily meant to be used when a test requires executing two pieces of + code with different world sizes. + + There are 2 parameters that can be modified: + - world_size: int = 2 -- the number of processes to launch + - backend: Literal['nccl','mpi','gloo'] = 'nccl' -- which backend to use + + Features: + - able to call pytest.skip() inside fixture + - can be reused by multiple tests + - can accept other fixtures as input + + Limitations: + - cannot use @pytest.mark.parametrize + - world_size cannot be modified after definition and only one world_size value is accepted + - any fixtures used must also be used in the test that uses this fixture (see example below) + - return values cannot be returned. Passing values to a DistributedTest + object can be achieved using class_tmpdir and writing to file (see example below) + + Usage: + - must implement a run(self, ...) method + - fixture can be used by making the class name input to a test function + + Example: + @pytest.fixture(params=[10,20]) + def regular_pytest_fixture(request): + return request.param + + class distributed_fixture_example(DistributedFixture): + world_size = 4 + + def run(self, regular_pytest_fixture, class_tmpdir): + assert int(os.environ["WORLD_SIZE"]) == self.world_size + local_rank = os.environ["LOCAL_RANK"] + print(f"Rank {local_rank} with value {regular_pytest_fixture}") + with open(os.path.join(class_tmpdir, f"{local_rank}.txt"), "w") as f: + f.write(f"{local_rank},{regular_pytest_fixture}") + + class TestExample(DistributedTest): + world_size = 1 + + def test(self, distributed_fixture_example, regular_pytest_fixture, class_tmpdir): + assert int(os.environ["WORLD_SIZE"]) == self.world_size + for rank in range(4): + with open(os.path.join(class_tmpdir, f"{rank}.txt"), "r") as f: + assert f.read() == f"{rank},{regular_pytest_fixture}" + """ + + is_dist_fixture = True + + # These values are just placeholders so that pytest recognizes this as a fixture + _pytestfixturefunction = FixtureFunctionMarker(scope="function", params=None) + __name__ = "" + + def __init__(self): + assert isinstance( + self.world_size, int + ), "Only one world size is allowed for distributed fixtures" + self.__name__ = type(self).__name__ + _pytestfixturefunction = FixtureFunctionMarker( + scope="function", params=None, name=self.__name__ + ) + + +class DistributedTest(DistributedExec): + """ + Implementation for running pytest with distributed execution. + + There are 2 parameters that can be modified: + - world_size: Union[int,List[int]] = 2 -- the number of processes to launch + - backend: Literal['nccl','mpi','gloo'] = 'nccl' -- which backend to use + + Features: + - able to call pytest.skip() inside tests + - works with pytest fixtures, parametrize, mark, etc. + - can contain multiple tests (each of which can be parametrized separately) + - class methods can be fixtures (usable by tests in this class only) + - world_size can be changed for individual tests using @pytest.mark.world_size(world_size) + - class_tmpdir is a fixture that can be used to get a tmpdir shared among + all tests (including DistributedFixture) + + Usage: + - class name must start with "Test" + - must implement one or more test*(self, ...) methods + + Example: + @pytest.fixture(params=[10,20]) + def val1(request): + return request.param + + @pytest.mark.fast + @pytest.mark.parametrize("val2", [30,40]) + class TestExample(DistributedTest): + world_size = 2 + + @pytest.fixture(params=[50,60]) + def val3(self, request): + return request.param + + def test_1(self, val1, val2, str1="hello world"): + assert int(os.environ["WORLD_SIZE"]) == self.world_size + assert all(val1, val2, str1) + + @pytest.mark.world_size(1) + @pytest.mark.parametrize("val4", [70,80]) + def test_2(self, val1, val2, val3, val4): + assert int(os.environ["WORLD_SIZE"]) == 1 + assert all(val1, val2, val3, val4) + """ + + is_dist_test = True + + # Temporary directory that is shared among test methods in a class + @pytest.fixture(autouse=True, scope="class") + def class_tmpdir(self, tmpdir_factory): + fn = tmpdir_factory.mktemp(self.__class__.__name__) + return fn + + def run(self, **fixture_kwargs): + self._current_test(**fixture_kwargs) + + def __call__(self, request): + self._current_test = self._get_current_test_func(request) + self._fixture_kwargs = self._get_fixture_kwargs(request, self._current_test) + + if not torch.cuda.is_available(): + pytest.skip("only supported in accelerator environments.") + + # Catch world_size override pytest mark + for mark in getattr(request.function, "pytestmark", []): + if mark.name == "world_size": + world_size = mark.args[0] + break + else: + world_size = self.world_size + + if isinstance(world_size, int): + world_size = [world_size] + for procs in world_size: + self._launch_procs(procs) + time.sleep(0.5) + + def _get_current_test_func(self, request): + # DistributedTest subclasses may have multiple test methods + func_name = request.function.__name__ + return getattr(self, func_name) + + +def create_testconfig(path: str): + with open(path) as f: + raw_data = json.load(f) + + return {k: [tuple(s.values()) if len(s) > 1 else tuple(s.values())[0] for s in v] for k, v in raw_data.items()} \ No newline at end of file diff --git a/tests/test_tools/test_ci_st.py b/tests/test_tools/test_ci_st.py new file mode 100644 index 00000000..d011bd38 --- /dev/null +++ b/tests/test_tools/test_ci_st.py @@ -0,0 +1,122 @@ +import pytest +from mindspeed_llm import megatron_adaptor +from tests.test_tools.acquire_json import transfer_logs_as_json, read_json + +MEMO_INFO = "memo info" +THROUGHPUT = "throughput" +LOSS = "lm loss" + +WARM_UP = 5 + + +class TestMargin: + _MARGIN_NAME = " margin" + loss = 0.02 + memory = 0.1 + throughput = 0.05 + + @classmethod + def refresh_margin_from_json(cls, json_obj): + cls.loss = json_obj.get(LOSS + cls._MARGIN_NAME, cls.loss) + cls.memory = json_obj.get(MEMO_INFO + cls._MARGIN_NAME, cls.memory) + cls.throughput = json_obj.get(THROUGHPUT + cls._MARGIN_NAME, cls.throughput) + + +class TestCIST: + + margin_loss = 0.02 + margin_throughput_percent = 0.05 + margin_memory_percent = 0.1 + + def _get_baseline(self, baseline_json): + # acquire expected results + self.expected = read_json(baseline_json) + TestMargin.refresh_margin_from_json(self.expected) + + def _get_actual(self, generate_log, generate_json): + # acquire actual results + transfer_logs_as_json(generate_log, generate_json) + self.actual = read_json(generate_json) + + def _test_helper(self, test_obj): + """ + Core test function + + Args: + test_obj: the object we want to test compare. + test_type: deterministic or approximate, default is None. + + Here we temperally test `lm loss`, 'throughput' and `allocated memory` + """ + comparison_selection = { + LOSS: self._compare_lm_loss, + THROUGHPUT: self._compare_throughput, + MEMO_INFO: self._compare_memory + } + + if test_obj in comparison_selection: + expected_list = self.expected[test_obj] + if not expected_list: + return + print(f"===================== Begin comparing {test_obj} ===================") + actual_list = self.actual[test_obj] + print(f"The list of expected values: {expected_list}") + print(f"The list of actual values: {actual_list}") + # Check if lists exist and are non-empty + if not actual_list: + raise ValueError(f"Actual list for {test_obj} is empty or not found. Maybe program has failed! Check it.") + + # Check if lists have the same length + if len(expected_list) != len(actual_list): + raise ValueError(f"Actual lengths of the lists for {test_obj} do not match. Maybe program has failed! Check it.") + + compare_func = comparison_selection[test_obj] + compare_func(expected_list, actual_list) + else: + raise ValueError(f"Unsupported test object: {test_obj}") + + def _compare_lm_loss(self, expected_list, actual_list): + # Because "deterministic computation" affects the throughput, so we just test + # lm loss in case of approximation. + for step, (expected_val, actual_val) in enumerate(zip(expected_list, actual_list)): + print(f"Checking step {step + 1} for lm loss") + assert actual_val == pytest.approx(expected=expected_val, rel=TestMargin.loss),\ + f"The loss at step {step} should be approximate to {expected_val} but it is {actual_val}." + + def _compare_throughput(self, expected_list, actual_list): + # First few iterations might take a little longer. So we take the last 70 percent of the timings + try: + expected_avg_throughput = sum(expected_list[WARM_UP:]) / (len(expected_list) - WARM_UP) + actual_avg_throughput = sum(actual_list[WARM_UP:]) / (len(actual_list) - WARM_UP) + except: + raise ZeroDivisionError + + assert actual_avg_throughput >= expected_avg_throughput or \ + abs(actual_avg_throughput - expected_avg_throughput) / expected_avg_throughput <= TestMargin.throughput, \ + f"The actual avg throughput {actual_avg_throughput} degradate expected avg throughput {expected_avg_throughput}" + + def _compare_memory(self, expected_list, actual_list): + for i, (expected_val, actual_val) in enumerate(zip(expected_list, actual_list)): + assert actual_val["allocated memory"] <= expected_val["allocated memory"] or \ + abs(actual_val["allocated memory"] - expected_val["allocated memory"]) / expected_val["allocated memory"] <= TestMargin.memory, \ + f'The actual memory {actual_val["allocated memory"]} seems to be abnormal compare to expected {expected_val["allocated memory"]}.' + + assert actual_val["max allocated memory"] <= expected_val["max allocated memory"] or \ + abs(actual_val["max allocated memory"] - expected_val["max allocated memory"]) / expected_val["max allocated memory"] <= TestMargin.memory, \ + f'The actual max memory {actual_val["max allocated memory"]} seems to be abnormal compare to expected {expected_val["max allocated memory"]}.' + + def test_lm_loss_approx(self, baseline_json, generate_log, generate_json): + # expected training loss curve at different global steps. + self._get_baseline(baseline_json) + self._get_actual(generate_log, generate_json) + self._test_helper("lm loss") + + def test_througpout(self, baseline_json, generate_log, generate_json): + self._get_baseline(baseline_json) + self._get_actual(generate_log, generate_json) + self._test_helper("throughput") + + def test_allocated_memory(self, baseline_json, generate_log, generate_json): + self._get_baseline(baseline_json) + self._get_actual(generate_log, generate_json) + self._test_helper("memo info") diff --git a/tests/test_tools/utils.py b/tests/test_tools/utils.py new file mode 100644 index 00000000..21578d00 --- /dev/null +++ b/tests/test_tools/utils.py @@ -0,0 +1,190 @@ +""" +We can't use assert in our code for codecheck, so create this auxiliary function to wrap +the assert case in ut for ci. +""" +import os +import hashlib +import logging +import re +import json +import glob +import sys +import subprocess +import pytest +import torch +import torch_npu +import megatron.core.parallel_state as mpu +from megatron.core.parallel_state import initialize_model_parallel +from mindspeed.core.parallel_state import initialize_model_parallel_wrapper +from mindspeed_llm.core.parallel_state import initialize_model_parallel_decorator + +def judge_expression(expression): + if not expression: + raise AssertionError + + +def compare_state_dicts(state_dict1, state_dict2): + if state_dict1.keys() != state_dict2.keys(): + print(f"base:{state_dict1.keys()} != save:{state_dict2.keys()}") + return False + + for key in state_dict1.keys(): + value1 = state_dict1[key] + value2 = state_dict2[key] + + if isinstance(value1, torch.Tensor) and isinstance(value2, torch.Tensor): + if not torch.equal(value1, value2): + print(f"Difference found in key: {key}") + return False + elif isinstance(value1, dict) and isinstance(value2, dict): + if not compare_state_dicts(value1, value2): + return False + else: + pass + + return True + + +def weight_compare(dir_1, dir_2, suffix="pt", use_md5=False): + models_path = glob.glob(os.path.join(dir_1, '**', f'*.{suffix}'), recursive=True) + if not models_path: + print(f"Can't find any weight files in {dir_1}.") + return False + for path_1 in models_path: + path_1 = os.path.normpath(path_1) + path_2 = path_1.replace(os.path.normpath(dir_1), os.path.normpath(dir_2)) + if use_md5: + are_equal = (get_md5sum(path_1) == get_md5sum(path_2)) + else: + state_dict1 = torch.load(path_1) + state_dict2 = torch.load(path_2) + are_equal = compare_state_dicts(state_dict1, state_dict2) + if not are_equal: + return False + + return True + + +def weight_compare_optim(dir_1, dir_2, suffix="pt", use_md5=False): + models_path = glob.glob(os.path.join(dir_1, '**', f'*.{suffix}'), recursive=True) + + if not models_path: + raise FileNotFoundError(f"{dir_1} is not a file or not exists !") + + for path_1 in models_path: + path_1 = os.path.normpath(path_1) + path_2 = path_1.replace(os.path.normpath(dir_1), os.path.normpath(dir_2)) + + file_name = os.path.basename(path_1) + if file_name == 'distrib_optim.pt': + use_md5 = True + elif file_name == 'model_optim_rng.pt': + use_md5 = False + + if use_md5: + are_equal = (get_md5sum(path_1) == get_md5sum(path_2)) + else: + state_dict1 = torch.load(path_1) + state_dict2 = torch.load(path_2) + are_equal = compare_state_dicts(state_dict1, state_dict2) + + if not are_equal: + return False + + return True + + +def compare_file_md5_same(file1, file2): + return get_md5sum(file1) == get_md5sum(file2) + + +def get_md5sum(fpath): + if not os.path.isfile(fpath): + raise FileNotFoundError(f"{fpath} is not a file or not exists !") + md5sum = hashlib.md5() + with open(fpath, 'rb') as f: + md5sum.update(f.read()) + return md5sum.hexdigest() + + +def delete_distrib_optim_files(folder_path): + + for root, dirs, files in os.walk(folder_path): + for file in files: + if file == "distrib_optim.pt": + file_path = os.path.join(root, file) + try: + os.remove(file_path) + logging.info(f"Deleted: {file_path}") + except Exception as e: + logging.exception(f"Failed to delete {file_path}: {e}") + raise + + +@pytest.fixture +def build_args(request, monkeypatch): + params = request.getfixturevalue("params") + argv = [sys.argv[0]] + for k, v in params.items(): + if v is None: + argv.append(f"--{k}") + elif isinstance(v, list): + argv.extend([f"--{k}"] + [str(value) for value in v]) + else: + argv.extend([f"--{k}", str(v)]) + monkeypatch.setattr(sys, "argv", argv) + + +def create_testconfig(path: str, cmd: bool = False): + with open(path) as f: + raw_data = json.load(f) + + res = {k: [tuple(s.values()) if len(s) > 1 else tuple(s.values())[0] for s in v] for k, v in raw_data.items()} + + if not cmd: + return res + + def __dict2cmdlist(param_value): + cmdlsts = [] + cmdlst = [] + for target in param_value: + for k, v in target.items(): + cmdlst.append(f"--{k}") + if v is not None: + cmdlst.extend(v.split()) + cmdlsts.extend(cmdlst) + return cmdlsts + + res_cmd = {key: __dict2cmdlist(value) for key, value in res.items()} + return res_cmd + + +class ListHandler(logging.Handler): + # Extract inference log, the regular expression is universal. + # Just pass the pattern you want. + def __init__(self, pattern): + super().__init__() + self.log_capture = [] + self.pattern = pattern + + def emit(self, record): + log_entry = self.format(record) + if re.search(self.pattern, log_entry, re.DOTALL): + self.log_capture.append(log_entry) + + +def setup_logger(pattern): + # Set the logger and the handler. + # Different tasks will not form interference, feel relieved to use. + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + handler = ListHandler(pattern) + handler.setLevel(logging.INFO) + logger.addHandler(handler) + + return handler, handler.log_capture + + +def run_cmd(cmd_strlist): + return subprocess.run(cmd_strlist).returncode -- Gitee From a3eb73e4cd9b19fb5c5c2bf809adc66939dfa479 Mon Sep 17 00:00:00 2001 From: l30056312 Date: Wed, 19 Feb 2025 14:29:11 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=BA:l30056312=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8E=9F=E5=9B=A0:=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E5=A4=9A=E4=BD=99=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/access_control_test.py | 7 +-- tests/test_tools/test_ci_st.py | 8 +-- tests/test_tools/utils.py | 90 +--------------------------------- 3 files changed, 3 insertions(+), 102 deletions(-) diff --git a/ci/access_control_test.py b/ci/access_control_test.py index e41ff0d7..c8779390 100644 --- a/ci/access_control_test.py +++ b/ci/access_control_test.py @@ -8,11 +8,7 @@ def read_files_from_txt(txt_file): def is_examples(file): - return file.startswith("examples/") or file.startswith("tests/poc/") - - -def is_poc(file): - return file.startswith("tests/poc") + return file.startswith("examples/") def is_pipecase(file): @@ -68,7 +64,6 @@ def choose_skip_ci(raw_txt_file): is_owners, is_license, is_no_suffix, - is_poc ] return skip_ci(file_list, skip_conds) diff --git a/tests/test_tools/test_ci_st.py b/tests/test_tools/test_ci_st.py index d011bd38..9448e085 100644 --- a/tests/test_tools/test_ci_st.py +++ b/tests/test_tools/test_ci_st.py @@ -1,5 +1,4 @@ import pytest -from mindspeed_llm import megatron_adaptor from tests.test_tools.acquire_json import transfer_logs_as_json, read_json MEMO_INFO = "memo info" @@ -23,11 +22,6 @@ class TestMargin: class TestCIST: - - margin_loss = 0.02 - margin_throughput_percent = 0.05 - margin_memory_percent = 0.1 - def _get_baseline(self, baseline_json): # acquire expected results self.expected = read_json(baseline_json) @@ -82,7 +76,7 @@ class TestCIST: print(f"Checking step {step + 1} for lm loss") assert actual_val == pytest.approx(expected=expected_val, rel=TestMargin.loss),\ f"The loss at step {step} should be approximate to {expected_val} but it is {actual_val}." - + def _compare_throughput(self, expected_list, actual_list): # First few iterations might take a little longer. So we take the last 70 percent of the timings try: diff --git a/tests/test_tools/utils.py b/tests/test_tools/utils.py index 21578d00..48fb585a 100644 --- a/tests/test_tools/utils.py +++ b/tests/test_tools/utils.py @@ -13,87 +13,13 @@ import subprocess import pytest import torch import torch_npu -import megatron.core.parallel_state as mpu -from megatron.core.parallel_state import initialize_model_parallel -from mindspeed.core.parallel_state import initialize_model_parallel_wrapper -from mindspeed_llm.core.parallel_state import initialize_model_parallel_decorator + def judge_expression(expression): if not expression: raise AssertionError -def compare_state_dicts(state_dict1, state_dict2): - if state_dict1.keys() != state_dict2.keys(): - print(f"base:{state_dict1.keys()} != save:{state_dict2.keys()}") - return False - - for key in state_dict1.keys(): - value1 = state_dict1[key] - value2 = state_dict2[key] - - if isinstance(value1, torch.Tensor) and isinstance(value2, torch.Tensor): - if not torch.equal(value1, value2): - print(f"Difference found in key: {key}") - return False - elif isinstance(value1, dict) and isinstance(value2, dict): - if not compare_state_dicts(value1, value2): - return False - else: - pass - - return True - - -def weight_compare(dir_1, dir_2, suffix="pt", use_md5=False): - models_path = glob.glob(os.path.join(dir_1, '**', f'*.{suffix}'), recursive=True) - if not models_path: - print(f"Can't find any weight files in {dir_1}.") - return False - for path_1 in models_path: - path_1 = os.path.normpath(path_1) - path_2 = path_1.replace(os.path.normpath(dir_1), os.path.normpath(dir_2)) - if use_md5: - are_equal = (get_md5sum(path_1) == get_md5sum(path_2)) - else: - state_dict1 = torch.load(path_1) - state_dict2 = torch.load(path_2) - are_equal = compare_state_dicts(state_dict1, state_dict2) - if not are_equal: - return False - - return True - - -def weight_compare_optim(dir_1, dir_2, suffix="pt", use_md5=False): - models_path = glob.glob(os.path.join(dir_1, '**', f'*.{suffix}'), recursive=True) - - if not models_path: - raise FileNotFoundError(f"{dir_1} is not a file or not exists !") - - for path_1 in models_path: - path_1 = os.path.normpath(path_1) - path_2 = path_1.replace(os.path.normpath(dir_1), os.path.normpath(dir_2)) - - file_name = os.path.basename(path_1) - if file_name == 'distrib_optim.pt': - use_md5 = True - elif file_name == 'model_optim_rng.pt': - use_md5 = False - - if use_md5: - are_equal = (get_md5sum(path_1) == get_md5sum(path_2)) - else: - state_dict1 = torch.load(path_1) - state_dict2 = torch.load(path_2) - are_equal = compare_state_dicts(state_dict1, state_dict2) - - if not are_equal: - return False - - return True - - def compare_file_md5_same(file1, file2): return get_md5sum(file1) == get_md5sum(file2) @@ -106,20 +32,6 @@ def get_md5sum(fpath): md5sum.update(f.read()) return md5sum.hexdigest() - -def delete_distrib_optim_files(folder_path): - - for root, dirs, files in os.walk(folder_path): - for file in files: - if file == "distrib_optim.pt": - file_path = os.path.join(root, file) - try: - os.remove(file_path) - logging.info(f"Deleted: {file_path}") - except Exception as e: - logging.exception(f"Failed to delete {file_path}: {e}") - raise - @pytest.fixture def build_args(request, monkeypatch): -- Gitee From f92bb37fada9c3700dc90196c0ed9c4584522c79 Mon Sep 17 00:00:00 2001 From: l30056312 Date: Thu, 27 Feb 2025 09:47:13 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8E=9F=E5=9B=A0:?= =?UTF-8?q?=E8=A1=A5=E5=85=85=E5=88=9D=E5=A7=8B=E5=8C=96=E7=94=A8=E4=BE=8B?= =?UTF-8?q?=20=E4=BF=AE=E6=94=B9=E4=BA=BA:l30056312?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/access_control_test.py | 160 ------------------------------------- tests/st/st_run.sh | 1 + tests/ut/__init__.py | 0 tests/ut/mock/test_mock.py | 25 ++++++ 4 files changed, 26 insertions(+), 160 deletions(-) delete mode 100644 ci/access_control_test.py create mode 100644 tests/st/st_run.sh create mode 100644 tests/ut/__init__.py create mode 100644 tests/ut/mock/test_mock.py diff --git a/ci/access_control_test.py b/ci/access_control_test.py deleted file mode 100644 index c8779390..00000000 --- a/ci/access_control_test.py +++ /dev/null @@ -1,160 +0,0 @@ -import os -from pathlib import Path - - -def read_files_from_txt(txt_file): - with open(txt_file, "r") as f: - return [line.strip() for line in f.readlines()] - - -def is_examples(file): - return file.startswith("examples/") - - -def is_pipecase(file): - return file.startswith("tests/pipeline") - - -def is_markdown(file): - return file.endswith(".md") - - -def is_image(file): - return file.endswith(".jpg") or file.endswith(".png") - - -def is_txt(file): - return file.endswith(".txt") - - -def is_owners(file): - return file.startswith("OWNERS") - - -def is_license(file): - return file.startswith("LICENSE") - - -def is_ut(file): - return file.startswith("tests/ut") - - -def is_no_suffix(file): - return os.path.splitext(file)[1] == '' - - -def skip_ci(files, skip_conds): - for file in files: - if not any(condition(file) for condition in skip_conds): - return False - return True - - -def choose_skip_ci(raw_txt_file): - if not os.path.exists(raw_txt_file): - return False - - file_list = read_files_from_txt(raw_txt_file) - skip_conds = [ - is_examples, - is_pipecase, - is_markdown, - is_image, - is_txt, - is_owners, - is_license, - is_no_suffix, - ] - - return skip_ci(file_list, skip_conds) - - -def filter_exec_ut(raw_txt_file): - file_list = read_files_from_txt(raw_txt_file) - filter_conds = [ - is_ut, - is_markdown - ] - for file in file_list: - if not any(condition(file) for condition in filter_conds): - return False, None - return True, file_list - - -def acquire_exitcode(command): - exitcode = os.system(command) - real_code = os.WEXITSTATUS(exitcode) - return real_code - - -# ============================= -# UT test, run with pytest -# ============================= - -class UTTest: - def __init__(self): - self.base_dir = Path(__file__).absolute().parents[1] - self.test_dir = os.path.join(self.base_dir, 'tests') - self.ut_files = os.path.join( - self.base_dir, self.test_dir, "ut" - ) - - def run_ut(self, raw_txt_file=None): - if raw_txt_file is not None and os.path.exists(raw_txt_file): - filtered_results = filter_exec_ut(raw_txt_file) - - if filtered_results[0]: - filtered_files = filtered_results[1] - full_path = [os.path.join(self.base_dir, file) for file in filtered_files] - exsit_ut_files = [file for file in full_path if os.path.exists(file) and file.endswith(".py")] - self.ut_files = " ".join(exsit_ut_files) - - command = f"pytest -x --log-cli-level=INFO {self.ut_files}" - code = acquire_exitcode(command) - if code == 0: - print("UT test success") - else: - print("UT failed") - exit(1) - - -# =============================================== -# ST test, run with sh. -# =============================================== - -class STTest: - def __init__(self): - self.base_dir = Path(__file__).absolute().parents[1] - self.test_dir = os.path.join(self.base_dir, 'tests') - - self.st_dir = "st" - self.st_shell = os.path.join( - self.test_dir, self.st_dir, "st_run.sh" - ) - - def run_st(self): - rectify_case = f"bash {self.st_shell}" - rectify_code = acquire_exitcode(rectify_case) - if rectify_code != 0: - print("rectify case failed, check it.") - exit(1) - - -def run_tests(raw_txt_file): - pass - - -def main(): - parent_dir = Path(__file__).absolute().parents[2] - raw_txt_file = os.path.join(parent_dir, "modify.txt") - - skip_signal = choose_skip_ci(raw_txt_file) - if skip_signal: - print("Skipping CI") - else: - run_tests(raw_txt_file) - - -if __name__ == "__main__": - main() - \ No newline at end of file diff --git a/tests/st/st_run.sh b/tests/st/st_run.sh new file mode 100644 index 00000000..c16aa327 --- /dev/null +++ b/tests/st/st_run.sh @@ -0,0 +1 @@ +echo "Hello World !" \ No newline at end of file diff --git a/tests/ut/__init__.py b/tests/ut/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/ut/mock/test_mock.py b/tests/ut/mock/test_mock.py new file mode 100644 index 00000000..06f1561d --- /dev/null +++ b/tests/ut/mock/test_mock.py @@ -0,0 +1,25 @@ +# coding=utf-8 +# Copyright (c) 2025, HUAWEI CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Just an initialize test""" + +import pytest # Just try can import or not +from tests.test_tools.dist_test import DistributedTest + + +class TestMock(DistributedTest): + world_size = 1 + + def test_mock_op(self): + assert 1 + 1 == 2, "Failed !" -- Gitee From 0ba53499d8583723b7c038def8f241c8ce7e21a2 Mon Sep 17 00:00:00 2001 From: l30056312 Date: Thu, 27 Feb 2025 15:18:12 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8E=9F=E5=9B=A0:?= =?UTF-8?q?=E8=A1=A5=E5=85=85ci=E5=85=A5=E5=8F=A3=20=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=BA=BA:l30056312?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/access_control_test.py | 170 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 ci/access_control_test.py diff --git a/ci/access_control_test.py b/ci/access_control_test.py new file mode 100644 index 00000000..5b354a5b --- /dev/null +++ b/ci/access_control_test.py @@ -0,0 +1,170 @@ +import os +from pathlib import Path + + +def read_files_from_txt(txt_file): + with open(txt_file, "r") as f: + return [line.strip() for line in f.readlines()] + + +def is_examples(file): + return file.startswith("examples/") or file.startswith("tests/poc/") + + +def is_poc(file): + return file.startswith("tests/poc") + + +def is_pipecase(file): + return file.startswith("tests/pipeline") + + +def is_markdown(file): + return file.endswith(".md") + + +def is_image(file): + return file.endswith(".jpg") or file.endswith(".png") + + +def is_txt(file): + return file.endswith(".txt") + + +def is_owners(file): + return file.startswith("OWNERS") + + +def is_license(file): + return file.startswith("LICENSE") + + +def is_ut(file): + return file.startswith("tests/ut") + + +def is_no_suffix(file): + return os.path.splitext(file)[1] == '' + + +def skip_ci(files, skip_conds): + for file in files: + if not any(condition(file) for condition in skip_conds): + return False + return True + + +def choose_skip_ci(raw_txt_file): + if not os.path.exists(raw_txt_file): + return False + + file_list = read_files_from_txt(raw_txt_file) + skip_conds = [ + is_examples, + is_pipecase, + is_markdown, + is_image, + is_txt, + is_owners, + is_license, + is_no_suffix, + is_poc + ] + + return skip_ci(file_list, skip_conds) + + +def filter_exec_ut(raw_txt_file): + file_list = read_files_from_txt(raw_txt_file) + filter_conds = [ + is_ut, + is_markdown + ] + for file in file_list: + if not any(condition(file) for condition in filter_conds): + return False, None + return True, file_list + + +def acquire_exitcode(command): + exitcode = os.system(command) + real_code = os.WEXITSTATUS(exitcode) + return real_code + + +# ============================= +# UT test, run with pytest +# ============================= + +class UTTest: + def __init__(self): + self.base_dir = Path(__file__).absolute().parents[1] + self.test_dir = os.path.join(self.base_dir, 'tests') + self.ut_files = os.path.join( + self.base_dir, self.test_dir, "ut" + ) + + def run_ut(self, raw_txt_file=None): + if raw_txt_file is not None and os.path.exists(raw_txt_file): + filtered_results = filter_exec_ut(raw_txt_file) + + if filtered_results[0]: + filtered_files = filtered_results[1] + full_path = [os.path.join(self.base_dir, file) for file in filtered_files] + exsit_ut_files = [file for file in full_path if os.path.exists(file) and file.endswith(".py")] + self.ut_files = " ".join(exsit_ut_files) + + command = f"pytest -x --log-cli-level=INFO {self.ut_files}" + code = acquire_exitcode(command) + if code == 0: + print("UT test success") + else: + print("UT failed") + exit(1) + + +# =============================================== +# ST test, run with sh. +# =============================================== + +class STTest: + def __init__(self): + self.base_dir = Path(__file__).absolute().parents[1] + self.test_dir = os.path.join(self.base_dir, 'tests') + + self.st_dir = "st" + self.st_shell = os.path.join( + self.test_dir, self.st_dir, "st_run.sh" + ) + + def run_st(self): + rectify_case = f"bash {self.st_shell}" + rectify_code = acquire_exitcode(rectify_case) + if rectify_code != 0: + print("rectify case failed, check it.") + exit(1) + + +def run_tests(raw_txt_file): + ut = UTTest() + st = STTest() + if filter_exec_ut(raw_txt_file)[0]: + ut.run_ut(raw_txt_file) + else: + ut.run_ut() + st.run_st() + + +def main(): + parent_dir = Path(__file__).absolute().parents[2] + raw_txt_file = os.path.join(parent_dir, "modify.txt") + + skip_signal = choose_skip_ci(raw_txt_file) + if skip_signal: + print("Skipping CI") + else: + run_tests(raw_txt_file) + + +if __name__ == "__main__": + main() -- Gitee From c869de2bd420380005e90c31484efb4e65a8527a Mon Sep 17 00:00:00 2001 From: l30056312 Date: Thu, 27 Feb 2025 16:43:20 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8E=9F=E5=9B=A0:?= =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=A4=9A=E4=BD=99=E4=BB=A3=E7=A0=81=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=BA:l30056312?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/access_control_test.py | 12 +--- tests/test_tools/acquire_json.py | 78 --------------------- tests/test_tools/conftest.py | 28 -------- tests/test_tools/test_ci_st.py | 116 ------------------------------- tests/test_tools/utils.py | 102 --------------------------- 5 files changed, 1 insertion(+), 335 deletions(-) delete mode 100644 tests/test_tools/acquire_json.py delete mode 100644 tests/test_tools/conftest.py delete mode 100644 tests/test_tools/test_ci_st.py delete mode 100644 tests/test_tools/utils.py diff --git a/ci/access_control_test.py b/ci/access_control_test.py index 5b354a5b..cdd927ac 100644 --- a/ci/access_control_test.py +++ b/ci/access_control_test.py @@ -11,14 +11,6 @@ def is_examples(file): return file.startswith("examples/") or file.startswith("tests/poc/") -def is_poc(file): - return file.startswith("tests/poc") - - -def is_pipecase(file): - return file.startswith("tests/pipeline") - - def is_markdown(file): return file.endswith(".md") @@ -61,14 +53,12 @@ def choose_skip_ci(raw_txt_file): file_list = read_files_from_txt(raw_txt_file) skip_conds = [ is_examples, - is_pipecase, is_markdown, is_image, is_txt, is_owners, is_license, - is_no_suffix, - is_poc + is_no_suffix ] return skip_ci(file_list, skip_conds) diff --git a/tests/test_tools/acquire_json.py b/tests/test_tools/acquire_json.py deleted file mode 100644 index 1b7f7f7c..00000000 --- a/tests/test_tools/acquire_json.py +++ /dev/null @@ -1,78 +0,0 @@ -import re -import json -import enum -import os - - -class TypeOfTest(enum.Enum): - APPROX = 1 - DETERMINISTIC = 2 - - -def transfer_logs_as_json(log_file, output_json_file): - """ - Read a log file from the input path, and return the - summary specified as input as a list - - Args: - log_file: str, path to the dir where the logs are located. - output_json_file: str, path of the json file transferred from the logs. - - Returns: - data: json, the values parsed from the log, formatted as a json file. - """ - - log_pattern = re.compile( - r"throughput per GPU \(TFLOP/s/GPU\):\s+([0-9.]+)\s+\|.*?lm loss:\s+([0-9.]+E[+-][0-9]+) | .* actor/pg_loss : ([0-9.]+)" - ) - - if "trl_ppo" in log_file: - log_pattern = re.compile( - r'throughput per GPU \(TFLOP/s/GPU\):\s([0-9\.]+).*?(?:abs_pg_loss:|vf_loss:)\s([0-9\.]+)' - ) - - memory_pattern = re.compile( - r"\[Rank (\d+)\] \(after \d+ iterations\) memory \(MB\) \| allocated: ([0-9.]+) \| max allocated: ([0-9.]+)" - ) - - data = { - "lm loss": [], - "throughput": [], - "memo info": [] - } - with open(log_file, "r") as f: - log_content = f.read() - log_matches = log_pattern.findall(log_content) - memory_matches = memory_pattern.findall(log_content) - - if log_matches: - if log_matches[0][1] != "": - data["lm loss"] = [float(match[1]) for match in log_matches] - data["throughput"] = [float(match[0]) for match in log_matches] - else: - data["lm loss"] = [float(match[2]) for match in log_matches] - - if memory_matches: - memo_info = [ - { - "rank": int(match[0]), - "allocated memory": float(match[1]), - "max allocated memory": float(match[2]) - } - for match in memory_matches - ] - data["memo info"] = sorted(memo_info, key=lambda x: x["rank"]) - - with open(output_json_file, "w") as outfile: - json.dump(data, outfile, indent=4) - - -def read_json(file): - """ - Read baseline and new generate json file - """ - if os.path.exists(file): - with open(file) as f: - return json.load(f) - else: - raise FileExistsError("The file does not exist !") diff --git a/tests/test_tools/conftest.py b/tests/test_tools/conftest.py deleted file mode 100644 index 870b1426..00000000 --- a/tests/test_tools/conftest.py +++ /dev/null @@ -1,28 +0,0 @@ -import pytest - - -# we still want to configure path argument by ourselves -# for different prefix_name of different scripts so we use this method. -# One more thing, as you can see, it has more scalibility. -def pytest_addoption(parser: pytest.Parser): - parser.addoption("--baseline-json", action="store", default=None, - help="Path to the baseline JSON file") - parser.addoption("--generate-log", action="store", default=None, - help="Path to the generate log file") - parser.addoption("--generate-json", action="store", default=None, - help="Path to the generate JSON file") - - -@pytest.fixture(autouse=True) -def baseline_json(request: pytest.FixtureRequest): - return request.config.getoption("--baseline-json") - - -@pytest.fixture(autouse=True) -def generate_log(request: pytest.FixtureRequest): - return request.config.getoption("--generate-log") - - -@pytest.fixture(autouse=True) -def generate_json(request: pytest.FixtureRequest): - return request.config.getoption("--generate-json") diff --git a/tests/test_tools/test_ci_st.py b/tests/test_tools/test_ci_st.py deleted file mode 100644 index 9448e085..00000000 --- a/tests/test_tools/test_ci_st.py +++ /dev/null @@ -1,116 +0,0 @@ -import pytest -from tests.test_tools.acquire_json import transfer_logs_as_json, read_json - -MEMO_INFO = "memo info" -THROUGHPUT = "throughput" -LOSS = "lm loss" - -WARM_UP = 5 - - -class TestMargin: - _MARGIN_NAME = " margin" - loss = 0.02 - memory = 0.1 - throughput = 0.05 - - @classmethod - def refresh_margin_from_json(cls, json_obj): - cls.loss = json_obj.get(LOSS + cls._MARGIN_NAME, cls.loss) - cls.memory = json_obj.get(MEMO_INFO + cls._MARGIN_NAME, cls.memory) - cls.throughput = json_obj.get(THROUGHPUT + cls._MARGIN_NAME, cls.throughput) - - -class TestCIST: - def _get_baseline(self, baseline_json): - # acquire expected results - self.expected = read_json(baseline_json) - TestMargin.refresh_margin_from_json(self.expected) - - def _get_actual(self, generate_log, generate_json): - # acquire actual results - transfer_logs_as_json(generate_log, generate_json) - self.actual = read_json(generate_json) - - def _test_helper(self, test_obj): - """ - Core test function - - Args: - test_obj: the object we want to test compare. - test_type: deterministic or approximate, default is None. - - Here we temperally test `lm loss`, 'throughput' and `allocated memory` - """ - comparison_selection = { - LOSS: self._compare_lm_loss, - THROUGHPUT: self._compare_throughput, - MEMO_INFO: self._compare_memory - } - - if test_obj in comparison_selection: - expected_list = self.expected[test_obj] - if not expected_list: - return - print(f"===================== Begin comparing {test_obj} ===================") - actual_list = self.actual[test_obj] - print(f"The list of expected values: {expected_list}") - print(f"The list of actual values: {actual_list}") - # Check if lists exist and are non-empty - if not actual_list: - raise ValueError(f"Actual list for {test_obj} is empty or not found. Maybe program has failed! Check it.") - - # Check if lists have the same length - if len(expected_list) != len(actual_list): - raise ValueError(f"Actual lengths of the lists for {test_obj} do not match. Maybe program has failed! Check it.") - - compare_func = comparison_selection[test_obj] - compare_func(expected_list, actual_list) - else: - raise ValueError(f"Unsupported test object: {test_obj}") - - def _compare_lm_loss(self, expected_list, actual_list): - # Because "deterministic computation" affects the throughput, so we just test - # lm loss in case of approximation. - for step, (expected_val, actual_val) in enumerate(zip(expected_list, actual_list)): - print(f"Checking step {step + 1} for lm loss") - assert actual_val == pytest.approx(expected=expected_val, rel=TestMargin.loss),\ - f"The loss at step {step} should be approximate to {expected_val} but it is {actual_val}." - - def _compare_throughput(self, expected_list, actual_list): - # First few iterations might take a little longer. So we take the last 70 percent of the timings - try: - expected_avg_throughput = sum(expected_list[WARM_UP:]) / (len(expected_list) - WARM_UP) - actual_avg_throughput = sum(actual_list[WARM_UP:]) / (len(actual_list) - WARM_UP) - except: - raise ZeroDivisionError - - assert actual_avg_throughput >= expected_avg_throughput or \ - abs(actual_avg_throughput - expected_avg_throughput) / expected_avg_throughput <= TestMargin.throughput, \ - f"The actual avg throughput {actual_avg_throughput} degradate expected avg throughput {expected_avg_throughput}" - - def _compare_memory(self, expected_list, actual_list): - for i, (expected_val, actual_val) in enumerate(zip(expected_list, actual_list)): - assert actual_val["allocated memory"] <= expected_val["allocated memory"] or \ - abs(actual_val["allocated memory"] - expected_val["allocated memory"]) / expected_val["allocated memory"] <= TestMargin.memory, \ - f'The actual memory {actual_val["allocated memory"]} seems to be abnormal compare to expected {expected_val["allocated memory"]}.' - - assert actual_val["max allocated memory"] <= expected_val["max allocated memory"] or \ - abs(actual_val["max allocated memory"] - expected_val["max allocated memory"]) / expected_val["max allocated memory"] <= TestMargin.memory, \ - f'The actual max memory {actual_val["max allocated memory"]} seems to be abnormal compare to expected {expected_val["max allocated memory"]}.' - - def test_lm_loss_approx(self, baseline_json, generate_log, generate_json): - # expected training loss curve at different global steps. - self._get_baseline(baseline_json) - self._get_actual(generate_log, generate_json) - self._test_helper("lm loss") - - def test_througpout(self, baseline_json, generate_log, generate_json): - self._get_baseline(baseline_json) - self._get_actual(generate_log, generate_json) - self._test_helper("throughput") - - def test_allocated_memory(self, baseline_json, generate_log, generate_json): - self._get_baseline(baseline_json) - self._get_actual(generate_log, generate_json) - self._test_helper("memo info") diff --git a/tests/test_tools/utils.py b/tests/test_tools/utils.py deleted file mode 100644 index 48fb585a..00000000 --- a/tests/test_tools/utils.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -We can't use assert in our code for codecheck, so create this auxiliary function to wrap -the assert case in ut for ci. -""" -import os -import hashlib -import logging -import re -import json -import glob -import sys -import subprocess -import pytest -import torch -import torch_npu - - -def judge_expression(expression): - if not expression: - raise AssertionError - - -def compare_file_md5_same(file1, file2): - return get_md5sum(file1) == get_md5sum(file2) - - -def get_md5sum(fpath): - if not os.path.isfile(fpath): - raise FileNotFoundError(f"{fpath} is not a file or not exists !") - md5sum = hashlib.md5() - with open(fpath, 'rb') as f: - md5sum.update(f.read()) - return md5sum.hexdigest() - - -@pytest.fixture -def build_args(request, monkeypatch): - params = request.getfixturevalue("params") - argv = [sys.argv[0]] - for k, v in params.items(): - if v is None: - argv.append(f"--{k}") - elif isinstance(v, list): - argv.extend([f"--{k}"] + [str(value) for value in v]) - else: - argv.extend([f"--{k}", str(v)]) - monkeypatch.setattr(sys, "argv", argv) - - -def create_testconfig(path: str, cmd: bool = False): - with open(path) as f: - raw_data = json.load(f) - - res = {k: [tuple(s.values()) if len(s) > 1 else tuple(s.values())[0] for s in v] for k, v in raw_data.items()} - - if not cmd: - return res - - def __dict2cmdlist(param_value): - cmdlsts = [] - cmdlst = [] - for target in param_value: - for k, v in target.items(): - cmdlst.append(f"--{k}") - if v is not None: - cmdlst.extend(v.split()) - cmdlsts.extend(cmdlst) - return cmdlsts - - res_cmd = {key: __dict2cmdlist(value) for key, value in res.items()} - return res_cmd - - -class ListHandler(logging.Handler): - # Extract inference log, the regular expression is universal. - # Just pass the pattern you want. - def __init__(self, pattern): - super().__init__() - self.log_capture = [] - self.pattern = pattern - - def emit(self, record): - log_entry = self.format(record) - if re.search(self.pattern, log_entry, re.DOTALL): - self.log_capture.append(log_entry) - - -def setup_logger(pattern): - # Set the logger and the handler. - # Different tasks will not form interference, feel relieved to use. - logger = logging.getLogger() - logger.setLevel(logging.INFO) - - handler = ListHandler(pattern) - handler.setLevel(logging.INFO) - logger.addHandler(handler) - - return handler, handler.log_capture - - -def run_cmd(cmd_strlist): - return subprocess.run(cmd_strlist).returncode -- Gitee From e34d1b8b598c2e9d80b4190be2f02396ceceb08a Mon Sep 17 00:00:00 2001 From: l30056312 Date: Thu, 27 Feb 2025 17:00:18 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8E=9F=E5=9B=A0:?= =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=A4=9A=E4=BD=99=E6=96=87=E4=BB=B6=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=BA:l30056312?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_tools/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/test_tools/__init__.py diff --git a/tests/test_tools/__init__.py b/tests/test_tools/__init__.py deleted file mode 100644 index e69de29b..00000000 -- Gitee